1use {
2 crate::{
3 rpc_config::{
4 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
5 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
6 RpcTransactionLogsFilter,
7 },
8 rpc_filter,
9 rpc_response::{
10 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
11 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
12 },
13 },
14 crossbeam_channel::{unbounded, Receiver, Sender},
15 log::*,
16 serde::de::DeserializeOwned,
17 serde_json::{
18 json,
19 value::Value::{Number, Object},
20 Map, Value,
21 },
22 safecoin_account_decoder::UiAccount,
23 solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
24 std::{
25 marker::PhantomData,
26 net::TcpStream,
27 sync::{
28 atomic::{AtomicBool, Ordering},
29 Arc, RwLock,
30 },
31 thread::{sleep, JoinHandle},
32 time::Duration,
33 },
34 thiserror::Error,
35 tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
36 url::{ParseError, Url},
37};
38
39#[derive(Debug, Error)]
40pub enum PubsubClientError {
41 #[error("url parse error")]
42 UrlParseError(#[from] ParseError),
43
44 #[error("unable to connect to server")]
45 ConnectionError(#[from] tungstenite::Error),
46
47 #[error("json parse error")]
48 JsonParseError(#[from] serde_json::error::Error),
49
50 #[error("unexpected message format: {0}")]
51 UnexpectedMessageError(String),
52
53 #[error("request error: {0}")]
54 RequestError(String),
55}
56
57pub struct PubsubClientSubscription<T>
58where
59 T: DeserializeOwned,
60{
61 message_type: PhantomData<T>,
62 operation: &'static str,
63 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
64 subscription_id: u64,
65 t_cleanup: Option<JoinHandle<()>>,
66 exit: Arc<AtomicBool>,
67}
68
69impl<T> Drop for PubsubClientSubscription<T>
70where
71 T: DeserializeOwned,
72{
73 fn drop(&mut self) {
74 self.send_unsubscribe()
75 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
76 self.socket
77 .write()
78 .unwrap()
79 .close(None)
80 .unwrap_or_else(|_| warn!("unable to close websocket"));
81 }
82}
83
84impl<T> PubsubClientSubscription<T>
85where
86 T: DeserializeOwned,
87{
88 fn send_subscribe(
89 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
90 body: String,
91 ) -> Result<u64, PubsubClientError> {
92 writable_socket
93 .write()
94 .unwrap()
95 .write_message(Message::Text(body))?;
96 let message = writable_socket.write().unwrap().read_message()?;
97 Self::extract_subscription_id(message)
98 }
99
100 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
101 let message_text = &message.into_text()?;
102 let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
103
104 if let Some(Number(x)) = json_msg.get("result") {
105 if let Some(x) = x.as_u64() {
106 return Ok(x);
107 }
108 }
109 Err(PubsubClientError::UnexpectedMessageError(format!(
111 "{:?}",
112 json_msg
113 )))
114 }
115
116 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
117 let method = format!("{}Unsubscribe", self.operation);
118 self.socket
119 .write()
120 .unwrap()
121 .write_message(Message::Text(
122 json!({
123 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
124 })
125 .to_string(),
126 ))
127 .map_err(|err| err.into())
128 }
129
130 fn get_version(
131 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
132 ) -> Result<semver::Version, PubsubClientError> {
133 writable_socket
134 .write()
135 .unwrap()
136 .write_message(Message::Text(
137 json!({
138 "jsonrpc":"2.0","id":1,"method":"getVersion",
139 })
140 .to_string(),
141 ))?;
142 let message = writable_socket.write().unwrap().read_message()?;
143 let message_text = &message.into_text()?;
144 let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
145
146 if let Some(Object(version_map)) = json_msg.get("result") {
147 if let Some(node_version) = version_map.get("safecoin-core") {
148 let node_version = semver::Version::parse(
149 node_version.as_str().unwrap_or_default(),
150 )
151 .map_err(|e| {
152 PubsubClientError::RequestError(format!(
153 "failed to parse cluster version: {}",
154 e
155 ))
156 })?;
157 return Ok(node_version);
158 }
159 }
160 Err(PubsubClientError::UnexpectedMessageError(format!(
162 "{:?}",
163 json_msg
164 )))
165 }
166
167 fn read_message(
168 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
169 ) -> Result<T, PubsubClientError> {
170 let message = writable_socket.write().unwrap().read_message()?;
171 let message_text = &message.into_text().unwrap();
172 let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
173
174 if let Some(Object(params)) = json_msg.get("params") {
175 if let Some(result) = params.get("result") {
176 let x: T = serde_json::from_value::<T>(result.clone()).unwrap();
177 return Ok(x);
178 }
179 }
180
181 Err(PubsubClientError::UnexpectedMessageError(format!(
183 "{:?}",
184 json_msg
185 )))
186 }
187
188 pub fn shutdown(&mut self) -> std::thread::Result<()> {
189 if self.t_cleanup.is_some() {
190 info!("websocket thread - shutting down");
191 self.exit.store(true, Ordering::Relaxed);
192 let x = self.t_cleanup.take().unwrap().join();
193 info!("websocket thread - shut down.");
194 x
195 } else {
196 warn!("websocket thread - already shut down.");
197 Ok(())
198 }
199 }
200}
201
202pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
203pub type LogsSubscription = (
204 PubsubLogsClientSubscription,
205 Receiver<RpcResponse<RpcLogsResponse>>,
206);
207
208pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
209pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
210
211pub type PubsubSignatureClientSubscription =
212 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
213pub type SignatureSubscription = (
214 PubsubSignatureClientSubscription,
215 Receiver<RpcResponse<RpcSignatureResult>>,
216);
217
218pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
219pub type BlockSubscription = (
220 PubsubBlockClientSubscription,
221 Receiver<RpcResponse<RpcBlockUpdate>>,
222);
223
224pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
225pub type ProgramSubscription = (
226 PubsubProgramClientSubscription,
227 Receiver<RpcResponse<RpcKeyedAccount>>,
228);
229
230pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
231pub type AccountSubscription = (
232 PubsubAccountClientSubscription,
233 Receiver<RpcResponse<UiAccount>>,
234);
235
236pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
237pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
238
239pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
240pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
241
242pub struct PubsubClient {}
243
244fn connect_with_retry(
245 url: Url,
246) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
247 let mut connection_retries = 5;
248 loop {
249 let result = connect(url.clone()).map(|(socket, _)| socket);
250 if let Err(tungstenite::Error::Http(response)) = &result {
251 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
252 {
253 let mut duration = Duration::from_millis(500);
254 if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
255 if let Ok(retry_after) = retry_after.to_str() {
256 if let Ok(retry_after) = retry_after.parse::<u64>() {
257 if retry_after < 120 {
258 duration = Duration::from_secs(retry_after);
259 }
260 }
261 }
262 }
263
264 connection_retries -= 1;
265 debug!(
266 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
267 response, connection_retries, duration
268 );
269
270 sleep(duration);
271 continue;
272 }
273 }
274 return result;
275 }
276}
277
278impl PubsubClient {
279 pub fn account_subscribe(
280 url: &str,
281 pubkey: &Pubkey,
282 config: Option<RpcAccountInfoConfig>,
283 ) -> Result<AccountSubscription, PubsubClientError> {
284 let url = Url::parse(url)?;
285 let socket = connect_with_retry(url)?;
286 let (sender, receiver) = unbounded();
287
288 let socket = Arc::new(RwLock::new(socket));
289 let socket_clone = socket.clone();
290 let exit = Arc::new(AtomicBool::new(false));
291 let exit_clone = exit.clone();
292 let body = json!({
293 "jsonrpc":"2.0",
294 "id":1,
295 "method":"accountSubscribe",
296 "params":[
297 pubkey.to_string(),
298 config
299 ]
300 })
301 .to_string();
302 let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
303
304 let t_cleanup = std::thread::spawn(move || {
305 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
306 });
307
308 let result = PubsubClientSubscription {
309 message_type: PhantomData,
310 operation: "account",
311 socket,
312 subscription_id,
313 t_cleanup: Some(t_cleanup),
314 exit,
315 };
316
317 Ok((result, receiver))
318 }
319
320 pub fn block_subscribe(
321 url: &str,
322 filter: RpcBlockSubscribeFilter,
323 config: Option<RpcBlockSubscribeConfig>,
324 ) -> Result<BlockSubscription, PubsubClientError> {
325 let url = Url::parse(url)?;
326 let socket = connect_with_retry(url)?;
327 let (sender, receiver) = unbounded();
328
329 let socket = Arc::new(RwLock::new(socket));
330 let socket_clone = socket.clone();
331 let exit = Arc::new(AtomicBool::new(false));
332 let exit_clone = exit.clone();
333 let body = json!({
334 "jsonrpc":"2.0",
335 "id":1,
336 "method":"blockSubscribe",
337 "params":[filter, config]
338 })
339 .to_string();
340
341 let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
342
343 let t_cleanup = std::thread::spawn(move || {
344 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
345 });
346
347 let result = PubsubClientSubscription {
348 message_type: PhantomData,
349 operation: "block",
350 socket,
351 subscription_id,
352 t_cleanup: Some(t_cleanup),
353 exit,
354 };
355
356 Ok((result, receiver))
357 }
358
359 pub fn logs_subscribe(
360 url: &str,
361 filter: RpcTransactionLogsFilter,
362 config: RpcTransactionLogsConfig,
363 ) -> Result<LogsSubscription, PubsubClientError> {
364 let url = Url::parse(url)?;
365 let socket = connect_with_retry(url)?;
366 let (sender, receiver) = unbounded();
367
368 let socket = Arc::new(RwLock::new(socket));
369 let socket_clone = socket.clone();
370 let exit = Arc::new(AtomicBool::new(false));
371 let exit_clone = exit.clone();
372 let body = json!({
373 "jsonrpc":"2.0",
374 "id":1,
375 "method":"logsSubscribe",
376 "params":[filter, config]
377 })
378 .to_string();
379
380 let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
381
382 let t_cleanup = std::thread::spawn(move || {
383 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
384 });
385
386 let result = PubsubClientSubscription {
387 message_type: PhantomData,
388 operation: "logs",
389 socket,
390 subscription_id,
391 t_cleanup: Some(t_cleanup),
392 exit,
393 };
394
395 Ok((result, receiver))
396 }
397
398 pub fn program_subscribe(
399 url: &str,
400 pubkey: &Pubkey,
401 mut config: Option<RpcProgramAccountsConfig>,
402 ) -> Result<ProgramSubscription, PubsubClientError> {
403 let url = Url::parse(url)?;
404 let socket = connect_with_retry(url)?;
405 let (sender, receiver) = unbounded();
406
407 let socket = Arc::new(RwLock::new(socket));
408 let socket_clone = socket.clone();
409 let exit = Arc::new(AtomicBool::new(false));
410 let exit_clone = exit.clone();
411
412 if let Some(ref mut config) = config {
413 if let Some(ref mut filters) = config.filters {
414 let node_version = PubsubProgramClientSubscription::get_version(&socket_clone).ok();
415 rpc_filter::maybe_map_filters(node_version, filters)
418 .map_err(PubsubClientError::RequestError)?;
419 }
420 }
421
422 let body = json!({
423 "jsonrpc":"2.0",
424 "id":1,
425 "method":"programSubscribe",
426 "params":[
427 pubkey.to_string(),
428 config
429 ]
430 })
431 .to_string();
432 let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
433
434 let t_cleanup = std::thread::spawn(move || {
435 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
436 });
437
438 let result = PubsubClientSubscription {
439 message_type: PhantomData,
440 operation: "program",
441 socket,
442 subscription_id,
443 t_cleanup: Some(t_cleanup),
444 exit,
445 };
446
447 Ok((result, receiver))
448 }
449
450 pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
451 let url = Url::parse(url)?;
452 let socket = connect_with_retry(url)?;
453 let (sender, receiver) = unbounded();
454
455 let socket = Arc::new(RwLock::new(socket));
456 let socket_clone = socket.clone();
457 let exit = Arc::new(AtomicBool::new(false));
458 let exit_clone = exit.clone();
459 let body = json!({
460 "jsonrpc":"2.0",
461 "id":1,
462 "method":"voteSubscribe",
463 })
464 .to_string();
465 let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
466
467 let t_cleanup = std::thread::spawn(move || {
468 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
469 });
470
471 let result = PubsubClientSubscription {
472 message_type: PhantomData,
473 operation: "vote",
474 socket,
475 subscription_id,
476 t_cleanup: Some(t_cleanup),
477 exit,
478 };
479
480 Ok((result, receiver))
481 }
482
483 pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
484 let url = Url::parse(url)?;
485 let socket = connect_with_retry(url)?;
486 let (sender, receiver) = unbounded();
487
488 let socket = Arc::new(RwLock::new(socket));
489 let socket_clone = socket.clone();
490 let exit = Arc::new(AtomicBool::new(false));
491 let exit_clone = exit.clone();
492 let body = json!({
493 "jsonrpc":"2.0",
494 "id":1,
495 "method":"rootSubscribe",
496 })
497 .to_string();
498 let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
499
500 let t_cleanup = std::thread::spawn(move || {
501 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
502 });
503
504 let result = PubsubClientSubscription {
505 message_type: PhantomData,
506 operation: "root",
507 socket,
508 subscription_id,
509 t_cleanup: Some(t_cleanup),
510 exit,
511 };
512
513 Ok((result, receiver))
514 }
515
516 pub fn signature_subscribe(
517 url: &str,
518 signature: &Signature,
519 config: Option<RpcSignatureSubscribeConfig>,
520 ) -> Result<SignatureSubscription, PubsubClientError> {
521 let url = Url::parse(url)?;
522 let socket = connect_with_retry(url)?;
523 let (sender, receiver) = unbounded();
524
525 let socket = Arc::new(RwLock::new(socket));
526 let socket_clone = socket.clone();
527 let exit = Arc::new(AtomicBool::new(false));
528 let exit_clone = exit.clone();
529 let body = json!({
530 "jsonrpc":"2.0",
531 "id":1,
532 "method":"signatureSubscribe",
533 "params":[
534 signature.to_string(),
535 config
536 ]
537 })
538 .to_string();
539 let subscription_id =
540 PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
541
542 let t_cleanup = std::thread::spawn(move || {
543 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
544 });
545
546 let result = PubsubClientSubscription {
547 message_type: PhantomData,
548 operation: "signature",
549 socket,
550 subscription_id,
551 t_cleanup: Some(t_cleanup),
552 exit,
553 };
554
555 Ok((result, receiver))
556 }
557
558 pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
559 let url = Url::parse(url)?;
560 let socket = connect_with_retry(url)?;
561 let (sender, receiver) = unbounded::<SlotInfo>();
562
563 let socket = Arc::new(RwLock::new(socket));
564 let socket_clone = socket.clone();
565 let exit = Arc::new(AtomicBool::new(false));
566 let exit_clone = exit.clone();
567 let body = json!({
568 "jsonrpc":"2.0",
569 "id":1,
570 "method":"slotSubscribe",
571 "params":[]
572 })
573 .to_string();
574 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
575
576 let t_cleanup = std::thread::spawn(move || {
577 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
578 });
579
580 let result = PubsubClientSubscription {
581 message_type: PhantomData,
582 operation: "slot",
583 socket,
584 subscription_id,
585 t_cleanup: Some(t_cleanup),
586 exit,
587 };
588
589 Ok((result, receiver))
590 }
591
592 pub fn slot_updates_subscribe(
593 url: &str,
594 handler: impl Fn(SlotUpdate) + Send + 'static,
595 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
596 let url = Url::parse(url)?;
597 let socket = connect_with_retry(url)?;
598
599 let socket = Arc::new(RwLock::new(socket));
600 let socket_clone = socket.clone();
601 let exit = Arc::new(AtomicBool::new(false));
602 let exit_clone = exit.clone();
603 let body = json!({
604 "jsonrpc":"2.0",
605 "id":1,
606 "method":"slotsUpdatesSubscribe",
607 "params":[]
608 })
609 .to_string();
610 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
611
612 let t_cleanup = std::thread::spawn(move || {
613 Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
614 });
615
616 Ok(PubsubClientSubscription {
617 message_type: PhantomData,
618 operation: "slotsUpdates",
619 socket,
620 subscription_id,
621 t_cleanup: Some(t_cleanup),
622 exit,
623 })
624 }
625
626 fn cleanup_with_sender<T>(
627 exit: Arc<AtomicBool>,
628 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
629 sender: Sender<T>,
630 ) where
631 T: DeserializeOwned + Send + 'static,
632 {
633 let handler = move |message| match sender.send(message) {
634 Ok(_) => (),
635 Err(err) => {
636 info!("receive error: {:?}", err);
637 }
638 };
639 Self::cleanup_with_handler(exit, socket, handler);
640 }
641
642 fn cleanup_with_handler<T, F>(
643 exit: Arc<AtomicBool>,
644 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
645 handler: F,
646 ) where
647 T: DeserializeOwned,
648 F: Fn(T) + Send + 'static,
649 {
650 loop {
651 if exit.load(Ordering::Relaxed) {
652 break;
653 }
654
655 match PubsubClientSubscription::read_message(socket) {
656 Ok(message) => handler(message),
657 Err(err) => {
658 info!("receive error: {:?}", err);
659 break;
660 }
661 }
662 }
663
664 info!("websocket - exited receive loop");
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 }