1pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91 crossbeam_channel::{unbounded, Receiver, Sender},
92 log::*,
93 serde::de::DeserializeOwned,
94 serde_json::{
95 json,
96 value::Value::{Number, Object},
97 Map, Value,
98 },
99 solana_account_decoder_client_types::UiAccount,
100 solana_clock::Slot,
101 solana_pubkey::Pubkey,
102 solana_rpc_client_api::{
103 config::{
104 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106 RpcTransactionLogsFilter,
107 },
108 response::{
109 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111 },
112 },
113 solana_signature::Signature,
114 std::{
115 marker::PhantomData,
116 net::TcpStream,
117 sync::{
118 atomic::{AtomicBool, Ordering},
119 Arc, RwLock,
120 },
121 thread::{sleep, JoinHandle},
122 time::Duration,
123 },
124 tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
125 url::Url,
126};
127
128pub struct PubsubClientSubscription<T>
134where
135 T: DeserializeOwned,
136{
137 message_type: PhantomData<T>,
138 operation: &'static str,
139 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
140 subscription_id: u64,
141 t_cleanup: Option<JoinHandle<()>>,
142 exit: Arc<AtomicBool>,
143}
144
145impl<T> Drop for PubsubClientSubscription<T>
146where
147 T: DeserializeOwned,
148{
149 fn drop(&mut self) {
150 self.send_unsubscribe()
151 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
152 self.socket
153 .write()
154 .unwrap()
155 .close(None)
156 .unwrap_or_else(|_| warn!("unable to close websocket"));
157 }
158}
159
160impl<T> PubsubClientSubscription<T>
161where
162 T: DeserializeOwned,
163{
164 fn send_subscribe(
165 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
166 body: String,
167 ) -> Result<u64, PubsubClientError> {
168 writable_socket.write().unwrap().send(Message::Text(body))?;
169 let message = writable_socket.write().unwrap().read()?;
170 Self::extract_subscription_id(message)
171 }
172
173 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
174 let message_text = &message.into_text()?;
175
176 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
177 if let Some(Number(x)) = json_msg.get("result") {
178 if let Some(x) = x.as_u64() {
179 return Ok(x);
180 }
181 }
182 }
183
184 Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
185 "msg={message_text}"
186 )))
187 }
188
189 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
198 let method = format!("{}Unsubscribe", self.operation);
199 self.socket
200 .write()
201 .unwrap()
202 .send(Message::Text(
203 json!({
204 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
205 })
206 .to_string(),
207 ))
208 .map_err(|err| err.into())
209 }
210
211 fn read_message(
212 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
213 ) -> Result<Option<T>, PubsubClientError> {
214 let message = writable_socket.write().unwrap().read()?;
215 if message.is_ping() {
216 return Ok(None);
217 }
218 let message_text = &message.into_text()?;
219 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
220 if let Some(Object(params)) = json_msg.get("params") {
221 if let Some(result) = params.get("result") {
222 if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
223 return Ok(Some(x));
224 }
225 }
226 }
227 }
228
229 Err(PubsubClientError::UnexpectedMessageError(format!(
230 "msg={message_text}"
231 )))
232 }
233
234 pub fn shutdown(&mut self) -> std::thread::Result<()> {
243 if self.t_cleanup.is_some() {
244 info!("websocket thread - shutting down");
245 self.exit.store(true, Ordering::Relaxed);
246 let x = self.t_cleanup.take().unwrap().join();
247 info!("websocket thread - shut down.");
248 x
249 } else {
250 warn!("websocket thread - already shut down.");
251 Ok(())
252 }
253 }
254}
255
256pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
257pub type LogsSubscription = (
258 PubsubLogsClientSubscription,
259 Receiver<RpcResponse<RpcLogsResponse>>,
260);
261
262pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
263pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
264
265pub type PubsubSignatureClientSubscription =
266 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
267pub type SignatureSubscription = (
268 PubsubSignatureClientSubscription,
269 Receiver<RpcResponse<RpcSignatureResult>>,
270);
271
272pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
273pub type BlockSubscription = (
274 PubsubBlockClientSubscription,
275 Receiver<RpcResponse<RpcBlockUpdate>>,
276);
277
278pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
279pub type ProgramSubscription = (
280 PubsubProgramClientSubscription,
281 Receiver<RpcResponse<RpcKeyedAccount>>,
282);
283
284pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
285pub type AccountSubscription = (
286 PubsubAccountClientSubscription,
287 Receiver<RpcResponse<UiAccount>>,
288);
289
290pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
291pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
292
293pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
294pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
295
296pub struct PubsubClient {}
300
301fn connect_with_retry(
302 url: Url,
303) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
304 let mut connection_retries = 5;
305 loop {
306 let result = connect(url.clone()).map(|(socket, _)| socket);
307 if let Err(tungstenite::Error::Http(response)) = &result {
308 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
309 {
310 let mut duration = Duration::from_millis(500);
311 if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
312 if let Ok(retry_after) = retry_after.to_str() {
313 if let Ok(retry_after) = retry_after.parse::<u64>() {
314 if retry_after < 120 {
315 duration = Duration::from_secs(retry_after);
316 }
317 }
318 }
319 }
320
321 connection_retries -= 1;
322 debug!(
323 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
324 response, connection_retries, duration
325 );
326
327 sleep(duration);
328 continue;
329 }
330 }
331 return result;
332 }
333}
334
335impl PubsubClient {
336 pub fn account_subscribe(
346 url: &str,
347 pubkey: &Pubkey,
348 config: Option<RpcAccountInfoConfig>,
349 ) -> Result<AccountSubscription, PubsubClientError> {
350 let url = Url::parse(url)?;
351 let socket = connect_with_retry(url)?;
352 let (sender, receiver) = unbounded();
353
354 let socket = Arc::new(RwLock::new(socket));
355 let socket_clone = socket.clone();
356 let exit = Arc::new(AtomicBool::new(false));
357 let exit_clone = exit.clone();
358 let body = json!({
359 "jsonrpc":"2.0",
360 "id":1,
361 "method":"accountSubscribe",
362 "params":[
363 pubkey.to_string(),
364 config
365 ]
366 })
367 .to_string();
368 let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
369
370 let t_cleanup = std::thread::spawn(move || {
371 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
372 });
373
374 let result = PubsubClientSubscription {
375 message_type: PhantomData,
376 operation: "account",
377 socket,
378 subscription_id,
379 t_cleanup: Some(t_cleanup),
380 exit,
381 };
382
383 Ok((result, receiver))
384 }
385
386 pub fn block_subscribe(
399 url: &str,
400 filter: RpcBlockSubscribeFilter,
401 config: Option<RpcBlockSubscribeConfig>,
402 ) -> Result<BlockSubscription, PubsubClientError> {
403 let url = Url::parse(url)?;
404 let socket = connect_with_retry(url)?;
405 let (sender, receiver) = unbounded();
406
407 let socket = Arc::new(RwLock::new(socket));
408 let socket_clone = socket.clone();
409 let exit = Arc::new(AtomicBool::new(false));
410 let exit_clone = exit.clone();
411 let body = json!({
412 "jsonrpc":"2.0",
413 "id":1,
414 "method":"blockSubscribe",
415 "params":[filter, config]
416 })
417 .to_string();
418
419 let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
420
421 let t_cleanup = std::thread::spawn(move || {
422 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
423 });
424
425 let result = PubsubClientSubscription {
426 message_type: PhantomData,
427 operation: "block",
428 socket,
429 subscription_id,
430 t_cleanup: Some(t_cleanup),
431 exit,
432 };
433
434 Ok((result, receiver))
435 }
436
437 pub fn logs_subscribe(
447 url: &str,
448 filter: RpcTransactionLogsFilter,
449 config: RpcTransactionLogsConfig,
450 ) -> Result<LogsSubscription, PubsubClientError> {
451 let url = Url::parse(url)?;
452 let socket = connect_with_retry(url)?;
453 let (sender, receiver) = unbounded();
454
455 let socket = Arc::new(RwLock::new(socket));
456 let socket_clone = socket.clone();
457 let exit = Arc::new(AtomicBool::new(false));
458 let exit_clone = exit.clone();
459 let body = json!({
460 "jsonrpc":"2.0",
461 "id":1,
462 "method":"logsSubscribe",
463 "params":[filter, config]
464 })
465 .to_string();
466
467 let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
468
469 let t_cleanup = std::thread::spawn(move || {
470 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
471 });
472
473 let result = PubsubClientSubscription {
474 message_type: PhantomData,
475 operation: "logs",
476 socket,
477 subscription_id,
478 t_cleanup: Some(t_cleanup),
479 exit,
480 };
481
482 Ok((result, receiver))
483 }
484
485 pub fn program_subscribe(
496 url: &str,
497 pubkey: &Pubkey,
498 config: Option<RpcProgramAccountsConfig>,
499 ) -> Result<ProgramSubscription, PubsubClientError> {
500 let url = Url::parse(url)?;
501 let socket = connect_with_retry(url)?;
502 let (sender, receiver) = unbounded();
503
504 let socket = Arc::new(RwLock::new(socket));
505 let socket_clone = socket.clone();
506 let exit = Arc::new(AtomicBool::new(false));
507 let exit_clone = exit.clone();
508
509 let body = json!({
510 "jsonrpc":"2.0",
511 "id":1,
512 "method":"programSubscribe",
513 "params":[
514 pubkey.to_string(),
515 config
516 ]
517 })
518 .to_string();
519 let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
520
521 let t_cleanup = std::thread::spawn(move || {
522 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
523 });
524
525 let result = PubsubClientSubscription {
526 message_type: PhantomData,
527 operation: "program",
528 socket,
529 subscription_id,
530 t_cleanup: Some(t_cleanup),
531 exit,
532 };
533
534 Ok((result, receiver))
535 }
536
537 pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
551 let url = Url::parse(url)?;
552 let socket = connect_with_retry(url)?;
553 let (sender, receiver) = unbounded();
554
555 let socket = Arc::new(RwLock::new(socket));
556 let socket_clone = socket.clone();
557 let exit = Arc::new(AtomicBool::new(false));
558 let exit_clone = exit.clone();
559 let body = json!({
560 "jsonrpc":"2.0",
561 "id":1,
562 "method":"voteSubscribe",
563 })
564 .to_string();
565 let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
566
567 let t_cleanup = std::thread::spawn(move || {
568 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
569 });
570
571 let result = PubsubClientSubscription {
572 message_type: PhantomData,
573 operation: "vote",
574 socket,
575 subscription_id,
576 t_cleanup: Some(t_cleanup),
577 exit,
578 };
579
580 Ok((result, receiver))
581 }
582
583 pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
596 let url = Url::parse(url)?;
597 let socket = connect_with_retry(url)?;
598 let (sender, receiver) = unbounded();
599
600 let socket = Arc::new(RwLock::new(socket));
601 let socket_clone = socket.clone();
602 let exit = Arc::new(AtomicBool::new(false));
603 let exit_clone = exit.clone();
604 let body = json!({
605 "jsonrpc":"2.0",
606 "id":1,
607 "method":"rootSubscribe",
608 })
609 .to_string();
610 let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
611
612 let t_cleanup = std::thread::spawn(move || {
613 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
614 });
615
616 let result = PubsubClientSubscription {
617 message_type: PhantomData,
618 operation: "root",
619 socket,
620 subscription_id,
621 t_cleanup: Some(t_cleanup),
622 exit,
623 };
624
625 Ok((result, receiver))
626 }
627
628 pub fn signature_subscribe(
642 url: &str,
643 signature: &Signature,
644 config: Option<RpcSignatureSubscribeConfig>,
645 ) -> Result<SignatureSubscription, PubsubClientError> {
646 let url = Url::parse(url)?;
647 let socket = connect_with_retry(url)?;
648 let (sender, receiver) = unbounded();
649
650 let socket = Arc::new(RwLock::new(socket));
651 let socket_clone = socket.clone();
652 let exit = Arc::new(AtomicBool::new(false));
653 let exit_clone = exit.clone();
654 let body = json!({
655 "jsonrpc":"2.0",
656 "id":1,
657 "method":"signatureSubscribe",
658 "params":[
659 signature.to_string(),
660 config
661 ]
662 })
663 .to_string();
664 let subscription_id =
665 PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
666
667 let t_cleanup = std::thread::spawn(move || {
668 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
669 });
670
671 let result = PubsubClientSubscription {
672 message_type: PhantomData,
673 operation: "signature",
674 socket,
675 subscription_id,
676 t_cleanup: Some(t_cleanup),
677 exit,
678 };
679
680 Ok((result, receiver))
681 }
682
683 pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
693 let url = Url::parse(url)?;
694 let socket = connect_with_retry(url)?;
695 let (sender, receiver) = unbounded::<SlotInfo>();
696
697 let socket = Arc::new(RwLock::new(socket));
698 let socket_clone = socket.clone();
699 let exit = Arc::new(AtomicBool::new(false));
700 let exit_clone = exit.clone();
701 let body = json!({
702 "jsonrpc":"2.0",
703 "id":1,
704 "method":"slotSubscribe",
705 "params":[]
706 })
707 .to_string();
708 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
709
710 let t_cleanup = std::thread::spawn(move || {
711 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
712 });
713
714 let result = PubsubClientSubscription {
715 message_type: PhantomData,
716 operation: "slot",
717 socket,
718 subscription_id,
719 t_cleanup: Some(t_cleanup),
720 exit,
721 };
722
723 Ok((result, receiver))
724 }
725
726 pub fn slot_updates_subscribe(
741 url: &str,
742 handler: impl Fn(SlotUpdate) + Send + 'static,
743 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
744 let url = Url::parse(url)?;
745 let socket = connect_with_retry(url)?;
746
747 let socket = Arc::new(RwLock::new(socket));
748 let socket_clone = socket.clone();
749 let exit = Arc::new(AtomicBool::new(false));
750 let exit_clone = exit.clone();
751 let body = json!({
752 "jsonrpc":"2.0",
753 "id":1,
754 "method":"slotsUpdatesSubscribe",
755 "params":[]
756 })
757 .to_string();
758 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
759
760 let t_cleanup = std::thread::spawn(move || {
761 Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
762 });
763
764 Ok(PubsubClientSubscription {
765 message_type: PhantomData,
766 operation: "slotsUpdates",
767 socket,
768 subscription_id,
769 t_cleanup: Some(t_cleanup),
770 exit,
771 })
772 }
773
774 fn cleanup_with_sender<T>(
775 exit: Arc<AtomicBool>,
776 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
777 sender: Sender<T>,
778 ) where
779 T: DeserializeOwned + Send + 'static,
780 {
781 let handler = move |message| match sender.send(message) {
782 Ok(_) => (),
783 Err(err) => {
784 info!("receive error: {:?}", err);
785 }
786 };
787 Self::cleanup_with_handler(exit, socket, handler);
788 }
789
790 fn cleanup_with_handler<T, F>(
791 exit: Arc<AtomicBool>,
792 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
793 handler: F,
794 ) where
795 T: DeserializeOwned,
796 F: Fn(T) + Send + 'static,
797 {
798 loop {
799 if exit.load(Ordering::Relaxed) {
800 break;
801 }
802
803 match PubsubClientSubscription::read_message(socket) {
804 Ok(Some(message)) => handler(message),
805 Ok(None) => {
806 }
808 Err(err) => {
809 info!("receive error: {:?}", err);
810 break;
811 }
812 }
813 }
814
815 info!("websocket - exited receive loop");
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 }