1pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91 crossbeam_channel::{unbounded, Receiver, Sender},
92 log::*,
93 serde::de::DeserializeOwned,
94 serde_json::{
95 json,
96 value::Value::{Number, Object},
97 Map, Value,
98 },
99 solana_account_decoder::UiAccount,
100 solana_rpc_client_api::{
101 config::{
102 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
103 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
104 RpcTransactionLogsFilter,
105 },
106 response::{
107 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
108 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
109 },
110 },
111 solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
112 std::{
113 marker::PhantomData,
114 net::TcpStream,
115 sync::{
116 atomic::{AtomicBool, Ordering},
117 Arc, RwLock,
118 },
119 thread::{sleep, JoinHandle},
120 time::Duration,
121 },
122 tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
123 url::Url,
124};
125
126pub struct PubsubClientSubscription<T>
132where
133 T: DeserializeOwned,
134{
135 message_type: PhantomData<T>,
136 operation: &'static str,
137 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
138 subscription_id: u64,
139 t_cleanup: Option<JoinHandle<()>>,
140 exit: Arc<AtomicBool>,
141}
142
143impl<T> Drop for PubsubClientSubscription<T>
144where
145 T: DeserializeOwned,
146{
147 fn drop(&mut self) {
148 self.send_unsubscribe()
149 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
150 self.socket
151 .write()
152 .unwrap()
153 .close(None)
154 .unwrap_or_else(|_| warn!("unable to close websocket"));
155 }
156}
157
158impl<T> PubsubClientSubscription<T>
159where
160 T: DeserializeOwned,
161{
162 fn send_subscribe(
163 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
164 body: String,
165 ) -> Result<u64, PubsubClientError> {
166 writable_socket.write().unwrap().send(Message::Text(body))?;
167 let message = writable_socket.write().unwrap().read()?;
168 Self::extract_subscription_id(message)
169 }
170
171 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
172 let message_text = &message.into_text()?;
173
174 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
175 if let Some(Number(x)) = json_msg.get("result") {
176 if let Some(x) = x.as_u64() {
177 return Ok(x);
178 }
179 }
180 }
181
182 Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
183 "msg={message_text}"
184 )))
185 }
186
187 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
196 let method = format!("{}Unsubscribe", self.operation);
197 self.socket
198 .write()
199 .unwrap()
200 .send(Message::Text(
201 json!({
202 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
203 })
204 .to_string(),
205 ))
206 .map_err(|err| err.into())
207 }
208
209 fn read_message(
210 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
211 ) -> Result<Option<T>, PubsubClientError> {
212 let message = writable_socket.write().unwrap().read()?;
213 if message.is_ping() {
214 return Ok(None);
215 }
216 let message_text = &message.into_text()?;
217 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
218 if let Some(Object(params)) = json_msg.get("params") {
219 if let Some(result) = params.get("result") {
220 if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
221 return Ok(Some(x));
222 }
223 }
224 }
225 }
226
227 Err(PubsubClientError::UnexpectedMessageError(format!(
228 "msg={message_text}"
229 )))
230 }
231
232 pub fn shutdown(&mut self) -> std::thread::Result<()> {
241 if self.t_cleanup.is_some() {
242 info!("websocket thread - shutting down");
243 self.exit.store(true, Ordering::Relaxed);
244 let x = self.t_cleanup.take().unwrap().join();
245 info!("websocket thread - shut down.");
246 x
247 } else {
248 warn!("websocket thread - already shut down.");
249 Ok(())
250 }
251 }
252}
253
254pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
255pub type LogsSubscription = (
256 PubsubLogsClientSubscription,
257 Receiver<RpcResponse<RpcLogsResponse>>,
258);
259
260pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
261pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
262
263pub type PubsubSignatureClientSubscription =
264 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
265pub type SignatureSubscription = (
266 PubsubSignatureClientSubscription,
267 Receiver<RpcResponse<RpcSignatureResult>>,
268);
269
270pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
271pub type BlockSubscription = (
272 PubsubBlockClientSubscription,
273 Receiver<RpcResponse<RpcBlockUpdate>>,
274);
275
276pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
277pub type ProgramSubscription = (
278 PubsubProgramClientSubscription,
279 Receiver<RpcResponse<RpcKeyedAccount>>,
280);
281
282pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
283pub type AccountSubscription = (
284 PubsubAccountClientSubscription,
285 Receiver<RpcResponse<UiAccount>>,
286);
287
288pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
289pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
290
291pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
292pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
293
294pub struct PubsubClient {}
298
299fn connect_with_retry(
300 url: Url,
301) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
302 let mut connection_retries = 5;
303 loop {
304 let result = connect(url.clone()).map(|(socket, _)| socket);
305 if let Err(tungstenite::Error::Http(response)) = &result {
306 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
307 {
308 let mut duration = Duration::from_millis(500);
309 if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
310 if let Ok(retry_after) = retry_after.to_str() {
311 if let Ok(retry_after) = retry_after.parse::<u64>() {
312 if retry_after < 120 {
313 duration = Duration::from_secs(retry_after);
314 }
315 }
316 }
317 }
318
319 connection_retries -= 1;
320 debug!(
321 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
322 response, connection_retries, duration
323 );
324
325 sleep(duration);
326 continue;
327 }
328 }
329 return result;
330 }
331}
332
333impl PubsubClient {
334 pub fn account_subscribe(
344 url: &str,
345 pubkey: &Pubkey,
346 config: Option<RpcAccountInfoConfig>,
347 ) -> Result<AccountSubscription, PubsubClientError> {
348 let url = Url::parse(url)?;
349 let socket = connect_with_retry(url)?;
350 let (sender, receiver) = unbounded();
351
352 let socket = Arc::new(RwLock::new(socket));
353 let socket_clone = socket.clone();
354 let exit = Arc::new(AtomicBool::new(false));
355 let exit_clone = exit.clone();
356 let body = json!({
357 "jsonrpc":"2.0",
358 "id":1,
359 "method":"accountSubscribe",
360 "params":[
361 pubkey.to_string(),
362 config
363 ]
364 })
365 .to_string();
366 let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
367
368 let t_cleanup = std::thread::spawn(move || {
369 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
370 });
371
372 let result = PubsubClientSubscription {
373 message_type: PhantomData,
374 operation: "account",
375 socket,
376 subscription_id,
377 t_cleanup: Some(t_cleanup),
378 exit,
379 };
380
381 Ok((result, receiver))
382 }
383
384 pub fn block_subscribe(
397 url: &str,
398 filter: RpcBlockSubscribeFilter,
399 config: Option<RpcBlockSubscribeConfig>,
400 ) -> Result<BlockSubscription, PubsubClientError> {
401 let url = Url::parse(url)?;
402 let socket = connect_with_retry(url)?;
403 let (sender, receiver) = unbounded();
404
405 let socket = Arc::new(RwLock::new(socket));
406 let socket_clone = socket.clone();
407 let exit = Arc::new(AtomicBool::new(false));
408 let exit_clone = exit.clone();
409 let body = json!({
410 "jsonrpc":"2.0",
411 "id":1,
412 "method":"blockSubscribe",
413 "params":[filter, config]
414 })
415 .to_string();
416
417 let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
418
419 let t_cleanup = std::thread::spawn(move || {
420 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
421 });
422
423 let result = PubsubClientSubscription {
424 message_type: PhantomData,
425 operation: "block",
426 socket,
427 subscription_id,
428 t_cleanup: Some(t_cleanup),
429 exit,
430 };
431
432 Ok((result, receiver))
433 }
434
435 pub fn logs_subscribe(
445 url: &str,
446 filter: RpcTransactionLogsFilter,
447 config: RpcTransactionLogsConfig,
448 ) -> Result<LogsSubscription, PubsubClientError> {
449 let url = Url::parse(url)?;
450 let socket = connect_with_retry(url)?;
451 let (sender, receiver) = unbounded();
452
453 let socket = Arc::new(RwLock::new(socket));
454 let socket_clone = socket.clone();
455 let exit = Arc::new(AtomicBool::new(false));
456 let exit_clone = exit.clone();
457 let body = json!({
458 "jsonrpc":"2.0",
459 "id":1,
460 "method":"logsSubscribe",
461 "params":[filter, config]
462 })
463 .to_string();
464
465 let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
466
467 let t_cleanup = std::thread::spawn(move || {
468 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
469 });
470
471 let result = PubsubClientSubscription {
472 message_type: PhantomData,
473 operation: "logs",
474 socket,
475 subscription_id,
476 t_cleanup: Some(t_cleanup),
477 exit,
478 };
479
480 Ok((result, receiver))
481 }
482
483 pub fn program_subscribe(
494 url: &str,
495 pubkey: &Pubkey,
496 config: Option<RpcProgramAccountsConfig>,
497 ) -> Result<ProgramSubscription, PubsubClientError> {
498 let url = Url::parse(url)?;
499 let socket = connect_with_retry(url)?;
500 let (sender, receiver) = unbounded();
501
502 let socket = Arc::new(RwLock::new(socket));
503 let socket_clone = socket.clone();
504 let exit = Arc::new(AtomicBool::new(false));
505 let exit_clone = exit.clone();
506
507 let body = json!({
508 "jsonrpc":"2.0",
509 "id":1,
510 "method":"programSubscribe",
511 "params":[
512 pubkey.to_string(),
513 config
514 ]
515 })
516 .to_string();
517 let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
518
519 let t_cleanup = std::thread::spawn(move || {
520 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
521 });
522
523 let result = PubsubClientSubscription {
524 message_type: PhantomData,
525 operation: "program",
526 socket,
527 subscription_id,
528 t_cleanup: Some(t_cleanup),
529 exit,
530 };
531
532 Ok((result, receiver))
533 }
534
535 pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
549 let url = Url::parse(url)?;
550 let socket = connect_with_retry(url)?;
551 let (sender, receiver) = unbounded();
552
553 let socket = Arc::new(RwLock::new(socket));
554 let socket_clone = socket.clone();
555 let exit = Arc::new(AtomicBool::new(false));
556 let exit_clone = exit.clone();
557 let body = json!({
558 "jsonrpc":"2.0",
559 "id":1,
560 "method":"voteSubscribe",
561 })
562 .to_string();
563 let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
564
565 let t_cleanup = std::thread::spawn(move || {
566 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
567 });
568
569 let result = PubsubClientSubscription {
570 message_type: PhantomData,
571 operation: "vote",
572 socket,
573 subscription_id,
574 t_cleanup: Some(t_cleanup),
575 exit,
576 };
577
578 Ok((result, receiver))
579 }
580
581 pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
594 let url = Url::parse(url)?;
595 let socket = connect_with_retry(url)?;
596 let (sender, receiver) = unbounded();
597
598 let socket = Arc::new(RwLock::new(socket));
599 let socket_clone = socket.clone();
600 let exit = Arc::new(AtomicBool::new(false));
601 let exit_clone = exit.clone();
602 let body = json!({
603 "jsonrpc":"2.0",
604 "id":1,
605 "method":"rootSubscribe",
606 })
607 .to_string();
608 let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
609
610 let t_cleanup = std::thread::spawn(move || {
611 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
612 });
613
614 let result = PubsubClientSubscription {
615 message_type: PhantomData,
616 operation: "root",
617 socket,
618 subscription_id,
619 t_cleanup: Some(t_cleanup),
620 exit,
621 };
622
623 Ok((result, receiver))
624 }
625
626 pub fn signature_subscribe(
640 url: &str,
641 signature: &Signature,
642 config: Option<RpcSignatureSubscribeConfig>,
643 ) -> Result<SignatureSubscription, PubsubClientError> {
644 let url = Url::parse(url)?;
645 let socket = connect_with_retry(url)?;
646 let (sender, receiver) = unbounded();
647
648 let socket = Arc::new(RwLock::new(socket));
649 let socket_clone = socket.clone();
650 let exit = Arc::new(AtomicBool::new(false));
651 let exit_clone = exit.clone();
652 let body = json!({
653 "jsonrpc":"2.0",
654 "id":1,
655 "method":"signatureSubscribe",
656 "params":[
657 signature.to_string(),
658 config
659 ]
660 })
661 .to_string();
662 let subscription_id =
663 PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
664
665 let t_cleanup = std::thread::spawn(move || {
666 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
667 });
668
669 let result = PubsubClientSubscription {
670 message_type: PhantomData,
671 operation: "signature",
672 socket,
673 subscription_id,
674 t_cleanup: Some(t_cleanup),
675 exit,
676 };
677
678 Ok((result, receiver))
679 }
680
681 pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
691 let url = Url::parse(url)?;
692 let socket = connect_with_retry(url)?;
693 let (sender, receiver) = unbounded::<SlotInfo>();
694
695 let socket = Arc::new(RwLock::new(socket));
696 let socket_clone = socket.clone();
697 let exit = Arc::new(AtomicBool::new(false));
698 let exit_clone = exit.clone();
699 let body = json!({
700 "jsonrpc":"2.0",
701 "id":1,
702 "method":"slotSubscribe",
703 "params":[]
704 })
705 .to_string();
706 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
707
708 let t_cleanup = std::thread::spawn(move || {
709 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
710 });
711
712 let result = PubsubClientSubscription {
713 message_type: PhantomData,
714 operation: "slot",
715 socket,
716 subscription_id,
717 t_cleanup: Some(t_cleanup),
718 exit,
719 };
720
721 Ok((result, receiver))
722 }
723
724 pub fn slot_updates_subscribe(
739 url: &str,
740 handler: impl Fn(SlotUpdate) + Send + 'static,
741 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
742 let url = Url::parse(url)?;
743 let socket = connect_with_retry(url)?;
744
745 let socket = Arc::new(RwLock::new(socket));
746 let socket_clone = socket.clone();
747 let exit = Arc::new(AtomicBool::new(false));
748 let exit_clone = exit.clone();
749 let body = json!({
750 "jsonrpc":"2.0",
751 "id":1,
752 "method":"slotsUpdatesSubscribe",
753 "params":[]
754 })
755 .to_string();
756 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
757
758 let t_cleanup = std::thread::spawn(move || {
759 Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
760 });
761
762 Ok(PubsubClientSubscription {
763 message_type: PhantomData,
764 operation: "slotsUpdates",
765 socket,
766 subscription_id,
767 t_cleanup: Some(t_cleanup),
768 exit,
769 })
770 }
771
772 fn cleanup_with_sender<T>(
773 exit: Arc<AtomicBool>,
774 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
775 sender: Sender<T>,
776 ) where
777 T: DeserializeOwned + Send + 'static,
778 {
779 let handler = move |message| match sender.send(message) {
780 Ok(_) => (),
781 Err(err) => {
782 info!("receive error: {:?}", err);
783 }
784 };
785 Self::cleanup_with_handler(exit, socket, handler);
786 }
787
788 fn cleanup_with_handler<T, F>(
789 exit: Arc<AtomicBool>,
790 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
791 handler: F,
792 ) where
793 T: DeserializeOwned,
794 F: Fn(T) + Send + 'static,
795 {
796 loop {
797 if exit.load(Ordering::Relaxed) {
798 break;
799 }
800
801 match PubsubClientSubscription::read_message(socket) {
802 Ok(Some(message)) => handler(message),
803 Ok(None) => {
804 }
806 Err(err) => {
807 info!("receive error: {:?}", err);
808 break;
809 }
810 }
811 }
812
813 info!("websocket - exited receive loop");
814 }
815}
816
817#[cfg(test)]
818mod tests {
819 }