solana_pubsub_client/nonblocking/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a nonblocking (async) API. For a blocking API use the synchronous
9//! client in [`crate::pubsub_client`].
10//!
11//! A single `PubsubClient` client may be used to subscribe to many events via
12//! subscription methods like [`PubsubClient::account_subscribe`]. These methods
13//! return a [`PubsubClientResult`] of a pair, the first element being a
14//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an
15//! unsubscribe closure, an asynchronous function that can be called and
16//! `await`ed to unsubscribe.
17//!
18//! Note that `BoxStream` contains an immutable reference to the `PubsubClient`
19//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in
20//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and
21//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus
22//! one viable pattern to creating multiple subscriptions is:
23//!
24//! - create an `Arc<PubsubClient>`
25//! - spawn one task for each subscription, sharing the `PubsubClient`.
26//! - in each task:
27//!   - create a subscription
28//!   - send the `UnsubscribeFn` to another task to handle shutdown
29//!   - loop while receiving messages from the subscription
30//!
31//! This pattern is illustrated in the example below.
32//!
33//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
34//! disabled on RPC nodes. They can be enabled by passing
35//! `--rpc-pubsub-enable-block-subscription` and
36//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
37//! methods are disabled, the RPC server will return a "Method not found" error
38//! message.
39//!
40//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
41//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
42//!
43//! # Examples
44//!
45//! Demo two async `PubsubClient` subscriptions with clean shutdown.
46//!
47//! This spawns a task for each subscription type, each of which subscribes and
48//! sends back a ready message and an unsubscribe channel (closure), then loops
49//! on printing messages. The main task then waits for user input before
50//! unsubscribing and waiting on the tasks.
51//!
52//! ```
53//! use anyhow::Result;
54//! use futures_util::StreamExt;
55//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
56//! use std::sync::Arc;
57//! use tokio::io::AsyncReadExt;
58//! use tokio::sync::mpsc::unbounded_channel;
59//!
60//! pub async fn watch_subscriptions(
61//!     websocket_url: &str,
62//! ) -> Result<()> {
63//!
64//!     // Subscription tasks will send a ready signal when they have subscribed.
65//!     let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
66//!
67//!     // Channel to receive unsubscribe channels (actually closures).
68//!     // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
69//!     // where the first is a closure to call to unsubscribe, the second is the subscription name.
70//!     let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
71//!
72//!     // The `PubsubClient` must be `Arc`ed to share it across tasks.
73//!     let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
74//!
75//!     let mut join_handles = vec![];
76//!
77//!     join_handles.push(("slot", tokio::spawn({
78//!         // Clone things we need before moving their clones into the `async move` block.
79//!         //
80//!         // The subscriptions have to be made from the tasks that will receive the subscription messages,
81//!         // because the subscription streams hold a reference to the `PubsubClient`.
82//!         // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
83//!
84//!         let ready_sender = ready_sender.clone();
85//!         let unsubscribe_sender = unsubscribe_sender.clone();
86//!         let pubsub_client = Arc::clone(&pubsub_client);
87//!         async move {
88//!             let (mut slot_notifications, slot_unsubscribe) =
89//!                 pubsub_client.slot_subscribe().await?;
90//!
91//!             // With the subscription started,
92//!             // send a signal back to the main task for synchronization.
93//!             ready_sender.send(()).expect("channel");
94//!
95//!             // Send the unsubscribe closure back to the main task.
96//!             unsubscribe_sender.send((slot_unsubscribe, "slot"))
97//!                 .map_err(|e| format!("{}", e)).expect("channel");
98//!
99//!             // Drop senders so that the channels can close.
100//!             // The main task will receive until channels are closed.
101//!             drop((ready_sender, unsubscribe_sender));
102//!
103//!             // Do something with the subscribed messages.
104//!             // This loop will end once the main task unsubscribes.
105//!             while let Some(slot_info) = slot_notifications.next().await {
106//!                 println!("------------------------------------------------------------");
107//!                 println!("slot pubsub result: {:?}", slot_info);
108//!             }
109//!
110//!             // This type hint is necessary to allow the `async move` block to use `?`.
111//!             Ok::<_, anyhow::Error>(())
112//!         }
113//!     })));
114//!
115//!     join_handles.push(("root", tokio::spawn({
116//!         let ready_sender = ready_sender.clone();
117//!         let unsubscribe_sender = unsubscribe_sender.clone();
118//!         let pubsub_client = Arc::clone(&pubsub_client);
119//!         async move {
120//!             let (mut root_notifications, root_unsubscribe) =
121//!                 pubsub_client.root_subscribe().await?;
122//!
123//!             ready_sender.send(()).expect("channel");
124//!             unsubscribe_sender.send((root_unsubscribe, "root"))
125//!                 .map_err(|e| format!("{}", e)).expect("channel");
126//!             drop((ready_sender, unsubscribe_sender));
127//!
128//!             while let Some(root) = root_notifications.next().await {
129//!                 println!("------------------------------------------------------------");
130//!                 println!("root pubsub result: {:?}", root);
131//!             }
132//!
133//!             Ok::<_, anyhow::Error>(())
134//!         }
135//!     })));
136//!
137//!     // Drop these senders so that the channels can close
138//!     // and their receivers return `None` below.
139//!     drop(ready_sender);
140//!     drop(unsubscribe_sender);
141//!
142//!     // Wait until all subscribers are ready before proceeding with application logic.
143//!     while let Some(_) = ready_receiver.recv().await { }
144//!
145//!     // Do application logic here.
146//!
147//!     // Wait for input or some application-specific shutdown condition.
148//!     tokio::io::stdin().read_u8().await?;
149//!
150//!     // Unsubscribe from everything, which will shutdown all the tasks.
151//!     while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
152//!         println!("unsubscribing from {}", name);
153//!         unsubscribe().await
154//!     }
155//!
156//!     // Wait for the tasks.
157//!     for (name, handle) in join_handles {
158//!         println!("waiting on task {}", name);
159//!         if let Ok(Err(e)) = handle.await {
160//!             println!("task {} failed: {}", name, e);
161//!         }
162//!     }
163//!
164//!     Ok(())
165//! }
166//! # Ok::<(), anyhow::Error>(())
167//! ```
168
169use {
170    futures_util::{
171        future::{ready, BoxFuture, FutureExt},
172        sink::SinkExt,
173        stream::{BoxStream, StreamExt},
174    },
175    log::*,
176    serde::de::DeserializeOwned,
177    serde_json::{json, Map, Value},
178    solana_account_decoder::UiAccount,
179    solana_rpc_client_api::{
180        config::{
181            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
182            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
183            RpcTransactionLogsFilter,
184        },
185        error_object::RpcErrorObject,
186        response::{
187            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
188            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
189        },
190    },
191    solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
192    std::collections::BTreeMap,
193    thiserror::Error,
194    tokio::{
195        net::TcpStream,
196        sync::{mpsc, oneshot},
197        task::JoinHandle,
198        time::{sleep, Duration},
199    },
200    tokio_stream::wrappers::UnboundedReceiverStream,
201    tokio_tungstenite::{
202        connect_async,
203        tungstenite::{
204            protocol::frame::{coding::CloseCode, CloseFrame},
205            Message,
206        },
207        MaybeTlsStream, WebSocketStream,
208    },
209    url::Url,
210};
211
212pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;
213
214#[derive(Debug, Error)]
215pub enum PubsubClientError {
216    #[error("url parse error")]
217    UrlParseError(#[from] url::ParseError),
218
219    #[error("unable to connect to server")]
220    ConnectionError(tokio_tungstenite::tungstenite::Error),
221
222    #[error("websocket error")]
223    WsError(#[from] tokio_tungstenite::tungstenite::Error),
224
225    #[error("connection closed (({0})")]
226    ConnectionClosed(String),
227
228    #[error("json parse error")]
229    JsonParseError(#[from] serde_json::error::Error),
230
231    #[error("subscribe failed: {reason}")]
232    SubscribeFailed { reason: String, message: String },
233
234    #[error("unexpected message format: {0}")]
235    UnexpectedMessageError(String),
236
237    #[error("request failed: {reason}")]
238    RequestFailed { reason: String, message: String },
239
240    #[error("request error: {0}")]
241    RequestError(String),
242
243    #[error("could not find subscription id: {0}")]
244    UnexpectedSubscriptionResponse(String),
245
246    #[error("could not find node version: {0}")]
247    UnexpectedGetVersionResponse(String),
248}
249
250type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
251type SubscribeResponseMsg =
252    Result<(mpsc::UnboundedReceiver<Value>, UnsubscribeFn), PubsubClientError>;
253type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
254type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
255type RequestMsg = (
256    String,
257    Value,
258    oneshot::Sender<Result<Value, PubsubClientError>>,
259);
260
261/// A client for subscribing to messages from the RPC server.
262///
263/// See the [module documentation][self].
264#[derive(Debug)]
265pub struct PubsubClient {
266    subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
267    _request_sender: mpsc::UnboundedSender<RequestMsg>,
268    shutdown_sender: oneshot::Sender<()>,
269    ws: JoinHandle<PubsubClientResult>,
270}
271
272impl PubsubClient {
273    pub async fn new(url: &str) -> PubsubClientResult<Self> {
274        let url = Url::parse(url)?;
275        let (ws, _response) = connect_async(url)
276            .await
277            .map_err(PubsubClientError::ConnectionError)?;
278
279        let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
280        let (_request_sender, request_receiver) = mpsc::unbounded_channel();
281        let (shutdown_sender, shutdown_receiver) = oneshot::channel();
282
283        #[allow(clippy::used_underscore_binding)]
284        Ok(Self {
285            subscribe_sender,
286            _request_sender,
287            shutdown_sender,
288            ws: tokio::spawn(PubsubClient::run_ws(
289                ws,
290                subscribe_receiver,
291                request_receiver,
292                shutdown_receiver,
293            )),
294        })
295    }
296
297    pub async fn shutdown(self) -> PubsubClientResult {
298        let _ = self.shutdown_sender.send(());
299        self.ws.await.unwrap() // WS future should not be cancelled or panicked
300    }
301
302    #[deprecated(since = "2.0.2", note = "PubsubClient::node_version is no longer used")]
303    pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
304        Ok(())
305    }
306
307    async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
308    where
309        T: DeserializeOwned + Send + 'a,
310    {
311        let (response_sender, response_receiver) = oneshot::channel();
312        self.subscribe_sender
313            .send((operation.to_string(), params, response_sender))
314            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
315
316        let (notifications, unsubscribe) = response_receiver
317            .await
318            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
319        Ok((
320            UnboundedReceiverStream::new(notifications)
321                .filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
322                .boxed(),
323            unsubscribe,
324        ))
325    }
326
327    /// Subscribe to account events.
328    ///
329    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
330    ///
331    /// # RPC Reference
332    ///
333    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
334    ///
335    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket#accountsubscribe
336    pub async fn account_subscribe(
337        &self,
338        pubkey: &Pubkey,
339        config: Option<RpcAccountInfoConfig>,
340    ) -> SubscribeResult<'_, RpcResponse<UiAccount>> {
341        let params = json!([pubkey.to_string(), config]);
342        self.subscribe("account", params).await
343    }
344
345    /// Subscribe to block events.
346    ///
347    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
348    ///
349    /// This method is disabled by default. It can be enabled by passing
350    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
351    ///
352    /// # RPC Reference
353    ///
354    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
355    ///
356    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket#blocksubscribe
357    pub async fn block_subscribe(
358        &self,
359        filter: RpcBlockSubscribeFilter,
360        config: Option<RpcBlockSubscribeConfig>,
361    ) -> SubscribeResult<'_, RpcResponse<RpcBlockUpdate>> {
362        self.subscribe("block", json!([filter, config])).await
363    }
364
365    /// Subscribe to transaction log events.
366    ///
367    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
368    ///
369    /// # RPC Reference
370    ///
371    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
372    ///
373    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket#logssubscribe
374    pub async fn logs_subscribe(
375        &self,
376        filter: RpcTransactionLogsFilter,
377        config: RpcTransactionLogsConfig,
378    ) -> SubscribeResult<'_, RpcResponse<RpcLogsResponse>> {
379        self.subscribe("logs", json!([filter, config])).await
380    }
381
382    /// Subscribe to program account events.
383    ///
384    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
385    /// by the given program changes.
386    ///
387    /// # RPC Reference
388    ///
389    /// This method corresponds directly to the [`programSubscribe`] RPC method.
390    ///
391    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket#programsubscribe
392    pub async fn program_subscribe(
393        &self,
394        pubkey: &Pubkey,
395        config: Option<RpcProgramAccountsConfig>,
396    ) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
397        let params = json!([pubkey.to_string(), config]);
398        self.subscribe("program", params).await
399    }
400
401    /// Subscribe to vote events.
402    ///
403    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
404    /// votes are observed prior to confirmation and may never be confirmed.
405    ///
406    /// This method is disabled by default. It can be enabled by passing
407    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
408    ///
409    /// # RPC Reference
410    ///
411    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
412    ///
413    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket#votesubscribe
414    pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
415        self.subscribe("vote", json!([])).await
416    }
417
418    /// Subscribe to root events.
419    ///
420    /// Receives messages of type [`Slot`] when a new [root] is set by the
421    /// validator.
422    ///
423    /// [root]: https://solana.com/docs/terminology#root
424    ///
425    /// # RPC Reference
426    ///
427    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
428    ///
429    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket#rootsubscribe
430    pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
431        self.subscribe("root", json!([])).await
432    }
433
434    /// Subscribe to transaction confirmation events.
435    ///
436    /// Receives messages of type [`RpcSignatureResult`] when a transaction
437    /// with the given signature is committed.
438    ///
439    /// This is a subscription to a single notification. It is automatically
440    /// cancelled by the server once the notification is sent.
441    ///
442    /// # RPC Reference
443    ///
444    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
445    ///
446    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket#signaturesubscribe
447    pub async fn signature_subscribe(
448        &self,
449        signature: &Signature,
450        config: Option<RpcSignatureSubscribeConfig>,
451    ) -> SubscribeResult<'_, RpcResponse<RpcSignatureResult>> {
452        let params = json!([signature.to_string(), config]);
453        self.subscribe("signature", params).await
454    }
455
456    /// Subscribe to slot events.
457    ///
458    /// Receives messages of type [`SlotInfo`] when a slot is processed.
459    ///
460    /// # RPC Reference
461    ///
462    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
463    ///
464    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket#slotsubscribe
465    pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
466        self.subscribe("slot", json!([])).await
467    }
468
469    /// Subscribe to slot update events.
470    ///
471    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
472    ///
473    /// Note that this method operates differently than other subscriptions:
474    /// instead of sending the message to a receiver on a channel, it accepts a
475    /// `handler` callback that processes the message directly. This processing
476    /// occurs on another thread.
477    ///
478    /// # RPC Reference
479    ///
480    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
481    ///
482    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket#slotsupdatessubscribe
483    pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
484        self.subscribe("slotsUpdates", json!([])).await
485    }
486
487    async fn run_ws(
488        mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
489        mut subscribe_receiver: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
490        mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
491        mut shutdown_receiver: oneshot::Receiver<()>,
492    ) -> PubsubClientResult {
493        let mut request_id: u64 = 0;
494
495        let mut requests_subscribe = BTreeMap::new();
496        let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
497        let mut other_requests = BTreeMap::new();
498        let mut subscriptions = BTreeMap::new();
499        let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();
500
501        loop {
502            tokio::select! {
503                // Send close on shutdown signal
504                _ = (&mut shutdown_receiver) => {
505                    let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
506                    ws.send(Message::Close(Some(frame))).await?;
507                    ws.flush().await?;
508                    break;
509                },
510                // Send `Message::Ping` each 10s if no any other communication
511                () = sleep(Duration::from_secs(10)) => {
512                    ws.send(Message::Ping(Vec::new())).await?;
513                },
514                // Read message for subscribe
515                Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
516                    request_id += 1;
517                    let method = format!("{operation}Subscribe");
518                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
519                    ws.send(Message::Text(text)).await?;
520                    requests_subscribe.insert(request_id, (operation, response_sender));
521                },
522                // Read message for unsubscribe
523                Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => {
524                    subscriptions.remove(&sid);
525                    request_id += 1;
526                    let method = format!("{operation}Unsubscribe");
527                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
528                    ws.send(Message::Text(text)).await?;
529                    requests_unsubscribe.insert(request_id, response_sender);
530                },
531                // Read message for other requests
532                Some((method, params, response_sender)) = request_receiver.recv() => {
533                    request_id += 1;
534                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
535                    ws.send(Message::Text(text)).await?;
536                    other_requests.insert(request_id, response_sender);
537                }
538                // Read incoming WebSocket message
539                next_msg = ws.next() => {
540                    let msg = match next_msg {
541                        Some(msg) => msg?,
542                        None => break,
543                    };
544                    trace!("ws.next(): {:?}", &msg);
545
546                    // Get text from the message
547                    let text = match msg {
548                        Message::Text(text) => text,
549                        Message::Binary(_data) => continue, // Ignore
550                        Message::Ping(data) => {
551                            ws.send(Message::Pong(data)).await?;
552                            continue
553                        },
554                        Message::Pong(_data) => continue,
555                        Message::Close(_frame) => break,
556                        Message::Frame(_frame) => continue,
557                    };
558
559
560                    let mut json: Map<String, Value> = serde_json::from_str(&text)?;
561
562                    // Subscribe/Unsubscribe response, example:
563                    // `{"jsonrpc":"2.0","result":5308752,"id":1}`
564                    if let Some(id) = json.get("id") {
565                        let id = id.as_u64().ok_or_else(|| {
566                            PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.clone() }
567                        })?;
568
569                        let err = json.get("error").map(|error_object| {
570                            match serde_json::from_value::<RpcErrorObject>(error_object.clone()) {
571                                Ok(rpc_error_object) => {
572                                    format!("{} ({})",  rpc_error_object.message, rpc_error_object.code)
573                                }
574                                Err(err) => format!(
575                                    "Failed to deserialize RPC error response: {} [{}]",
576                                    serde_json::to_string(error_object).unwrap(),
577                                    err
578                                )
579                            }
580                        });
581
582                        if let Some(response_sender) = other_requests.remove(&id) {
583                            match err {
584                                Some(reason) => {
585                                    let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.clone()}));
586                                },
587                                None => {
588                                    let json_result = json.get("result").ok_or_else(|| {
589                                        PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.clone() }
590                                    })?;
591                                    if response_sender.send(Ok(json_result.clone())).is_err() {
592                                        break;
593                                    }
594                                }
595                            }
596                        } else if let Some(response_sender) = requests_unsubscribe.remove(&id) {
597                            let _ = response_sender.send(()); // do not care if receiver is closed
598                        } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) {
599                            match err {
600                                Some(reason) => {
601                                    let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()}));
602                                },
603                                None => {
604                                    // Subscribe Id
605                                    let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
606                                        PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.clone() }
607                                    })?;
608
609                                    // Create notifications channel and unsubscribe function
610                                    let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel();
611                                    let unsubscribe_sender = unsubscribe_sender.clone();
612                                    let unsubscribe = Box::new(move || async move {
613                                        let (response_sender, response_receiver) = oneshot::channel();
614                                        // do nothing if ws already closed
615                                        if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() {
616                                            let _ = response_receiver.await; // channel can be closed only if ws is closed
617                                        }
618                                    }.boxed());
619
620                                    if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() {
621                                        break;
622                                    }
623                                    subscriptions.insert(sid, notifications_sender);
624                                }
625                            }
626                        } else {
627                            error!("Unknown request id: {}", id);
628                            break;
629                        }
630                        continue;
631                    }
632
633                    // Notification, example:
634                    // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}`
635                    if let Some(Value::Object(params)) = json.get_mut("params") {
636                        if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
637                            let mut unsubscribe_required = false;
638
639                            if let Some(notifications_sender) = subscriptions.get(&sid) {
640                                if let Some(result) = params.remove("result") {
641                                    if notifications_sender.send(result).is_err() {
642                                        unsubscribe_required = true;
643                                    }
644                                }
645                            } else {
646                                unsubscribe_required = true;
647                            }
648
649                            if unsubscribe_required {
650                                if let Some(Value::String(method)) = json.remove("method") {
651                                    if let Some(operation) = method.strip_suffix("Notification") {
652                                        let (response_sender, _response_receiver) = oneshot::channel();
653                                        let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender));
654                                    }
655                                }
656                            }
657                        }
658                    }
659                }
660            }
661        }
662
663        Ok(())
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    // see client-test/test/client.rs
670}