solana_pubsub_client/nonblocking/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a nonblocking (async) API. For a blocking API use the synchronous
9//! client in [`crate::pubsub_client`].
10//!
11//! A single `PubsubClient` client may be used to subscribe to many events via
12//! subscription methods like [`PubsubClient::account_subscribe`]. These methods
13//! return a [`PubsubClientResult`] of a pair, the first element being a
14//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an
15//! unsubscribe closure, an asynchronous function that can be called and
16//! `await`ed to unsubscribe.
17//!
18//! Note that `BoxStream` contains an immutable reference to the `PubsubClient`
19//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in
20//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and
21//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus
22//! one viable pattern to creating multiple subscriptions is:
23//!
24//! - create an `Arc<PubsubClient>`
25//! - spawn one task for each subscription, sharing the `PubsubClient`.
26//! - in each task:
27//!   - create a subscription
28//!   - send the `UnsubscribeFn` to another task to handle shutdown
29//!   - loop while receiving messages from the subscription
30//!
31//! This pattern is illustrated in the example below.
32//!
33//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
34//! disabled on RPC nodes. They can be enabled by passing
35//! `--rpc-pubsub-enable-block-subscription` and
36//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
37//! methods are disabled, the RPC server will return a "Method not found" error
38//! message.
39//!
40//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
41//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
42//!
43//! # Examples
44//!
45//! Demo two async `PubsubClient` subscriptions with clean shutdown.
46//!
47//! This spawns a task for each subscription type, each of which subscribes and
48//! sends back a ready message and an unsubscribe channel (closure), then loops
49//! on printing messages. The main task then waits for user input before
50//! unsubscribing and waiting on the tasks.
51//!
52//! ```
53//! use anyhow::Result;
54//! use futures_util::StreamExt;
55//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
56//! use std::sync::Arc;
57//! use tokio::io::AsyncReadExt;
58//! use tokio::sync::mpsc::unbounded_channel;
59//!
60//! pub async fn watch_subscriptions(
61//!     websocket_url: &str,
62//! ) -> Result<()> {
63//!
64//!     // Subscription tasks will send a ready signal when they have subscribed.
65//!     let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
66//!
67//!     // Channel to receive unsubscribe channels (actually closures).
68//!     // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
69//!     // where the first is a closure to call to unsubscribe, the second is the subscription name.
70//!     let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
71//!
72//!     // The `PubsubClient` must be `Arc`ed to share it across tasks.
73//!     let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
74//!
75//!     let mut join_handles = vec![];
76//!
77//!     join_handles.push(("slot", tokio::spawn({
78//!         // Clone things we need before moving their clones into the `async move` block.
79//!         //
80//!         // The subscriptions have to be made from the tasks that will receive the subscription messages,
81//!         // because the subscription streams hold a reference to the `PubsubClient`.
82//!         // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
83//!
84//!         let ready_sender = ready_sender.clone();
85//!         let unsubscribe_sender = unsubscribe_sender.clone();
86//!         let pubsub_client = Arc::clone(&pubsub_client);
87//!         async move {
88//!             let (mut slot_notifications, slot_unsubscribe) =
89//!                 pubsub_client.slot_subscribe().await?;
90//!
91//!             // With the subscription started,
92//!             // send a signal back to the main task for synchronization.
93//!             ready_sender.send(()).expect("channel");
94//!
95//!             // Send the unsubscribe closure back to the main task.
96//!             unsubscribe_sender.send((slot_unsubscribe, "slot"))
97//!                 .map_err(|e| format!("{}", e)).expect("channel");
98//!
99//!             // Drop senders so that the channels can close.
100//!             // The main task will receive until channels are closed.
101//!             drop((ready_sender, unsubscribe_sender));
102//!
103//!             // Do something with the subscribed messages.
104//!             // This loop will end once the main task unsubscribes.
105//!             while let Some(slot_info) = slot_notifications.next().await {
106//!                 println!("------------------------------------------------------------");
107//!                 println!("slot pubsub result: {:?}", slot_info);
108//!             }
109//!
110//!             // This type hint is necessary to allow the `async move` block to use `?`.
111//!             Ok::<_, anyhow::Error>(())
112//!         }
113//!     })));
114//!
115//!     join_handles.push(("root", tokio::spawn({
116//!         let ready_sender = ready_sender.clone();
117//!         let unsubscribe_sender = unsubscribe_sender.clone();
118//!         let pubsub_client = Arc::clone(&pubsub_client);
119//!         async move {
120//!             let (mut root_notifications, root_unsubscribe) =
121//!                 pubsub_client.root_subscribe().await?;
122//!
123//!             ready_sender.send(()).expect("channel");
124//!             unsubscribe_sender.send((root_unsubscribe, "root"))
125//!                 .map_err(|e| format!("{}", e)).expect("channel");
126//!             drop((ready_sender, unsubscribe_sender));
127//!
128//!             while let Some(root) = root_notifications.next().await {
129//!                 println!("------------------------------------------------------------");
130//!                 println!("root pubsub result: {:?}", root);
131//!             }
132//!
133//!             Ok::<_, anyhow::Error>(())
134//!         }
135//!     })));
136//!
137//!     // Drop these senders so that the channels can close
138//!     // and their receivers return `None` below.
139//!     drop(ready_sender);
140//!     drop(unsubscribe_sender);
141//!
142//!     // Wait until all subscribers are ready before proceeding with application logic.
143//!     while let Some(_) = ready_receiver.recv().await { }
144//!
145//!     // Do application logic here.
146//!
147//!     // Wait for input or some application-specific shutdown condition.
148//!     tokio::io::stdin().read_u8().await?;
149//!
150//!     // Unsubscribe from everything, which will shutdown all the tasks.
151//!     while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
152//!         println!("unsubscribing from {}", name);
153//!         unsubscribe().await
154//!     }
155//!
156//!     // Wait for the tasks.
157//!     for (name, handle) in join_handles {
158//!         println!("waiting on task {}", name);
159//!         if let Ok(Err(e)) = handle.await {
160//!             println!("task {} failed: {}", name, e);
161//!         }
162//!     }
163//!
164//!     Ok(())
165//! }
166//! # Ok::<(), anyhow::Error>(())
167//! ```
168
169use {
170    futures_util::{
171        future::{ready, BoxFuture, FutureExt},
172        sink::SinkExt,
173        stream::{BoxStream, StreamExt},
174    },
175    log::*,
176    serde::de::DeserializeOwned,
177    serde_json::{json, Map, Value},
178    solana_account_decoder_client_types::UiAccount,
179    solana_clock::Slot,
180    solana_pubkey::Pubkey,
181    solana_rpc_client_api::{
182        config::{
183            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
184            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
185            RpcTransactionLogsFilter,
186        },
187        error_object::RpcErrorObject,
188        response::{
189            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
190            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
191        },
192    },
193    solana_signature::Signature,
194    std::collections::BTreeMap,
195    thiserror::Error,
196    tokio::{
197        net::TcpStream,
198        sync::{mpsc, oneshot},
199        task::JoinHandle,
200        time::{sleep, Duration},
201    },
202    tokio_stream::wrappers::UnboundedReceiverStream,
203    tokio_tungstenite::{
204        connect_async,
205        tungstenite::{
206            protocol::frame::{coding::CloseCode, CloseFrame},
207            Message,
208        },
209        MaybeTlsStream, WebSocketStream,
210    },
211    url::Url,
212};
213
214pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;
215
216#[derive(Debug, Error)]
217pub enum PubsubClientError {
218    #[error("url parse error")]
219    UrlParseError(#[from] url::ParseError),
220
221    #[error("unable to connect to server")]
222    ConnectionError(tokio_tungstenite::tungstenite::Error),
223
224    #[error("websocket error")]
225    WsError(#[from] tokio_tungstenite::tungstenite::Error),
226
227    #[error("connection closed (({0})")]
228    ConnectionClosed(String),
229
230    #[error("json parse error")]
231    JsonParseError(#[from] serde_json::error::Error),
232
233    #[error("subscribe failed: {reason}")]
234    SubscribeFailed { reason: String, message: String },
235
236    #[error("unexpected message format: {0}")]
237    UnexpectedMessageError(String),
238
239    #[error("request failed: {reason}")]
240    RequestFailed { reason: String, message: String },
241
242    #[error("request error: {0}")]
243    RequestError(String),
244
245    #[error("could not find subscription id: {0}")]
246    UnexpectedSubscriptionResponse(String),
247
248    #[error("could not find node version: {0}")]
249    UnexpectedGetVersionResponse(String),
250}
251
252type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
253type SubscribeResponseMsg =
254    Result<(mpsc::UnboundedReceiver<Value>, UnsubscribeFn), PubsubClientError>;
255type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
256type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
257type RequestMsg = (
258    String,
259    Value,
260    oneshot::Sender<Result<Value, PubsubClientError>>,
261);
262
263/// A client for subscribing to messages from the RPC server.
264///
265/// See the [module documentation][self].
266#[derive(Debug)]
267pub struct PubsubClient {
268    subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
269    _request_sender: mpsc::UnboundedSender<RequestMsg>,
270    shutdown_sender: oneshot::Sender<()>,
271    ws: JoinHandle<PubsubClientResult>,
272}
273
274impl PubsubClient {
275    pub async fn new(url: &str) -> PubsubClientResult<Self> {
276        let url = Url::parse(url)?;
277        let (ws, _response) = connect_async(url)
278            .await
279            .map_err(PubsubClientError::ConnectionError)?;
280
281        let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
282        let (_request_sender, request_receiver) = mpsc::unbounded_channel();
283        let (shutdown_sender, shutdown_receiver) = oneshot::channel();
284
285        #[allow(clippy::used_underscore_binding)]
286        Ok(Self {
287            subscribe_sender,
288            _request_sender,
289            shutdown_sender,
290            ws: tokio::spawn(PubsubClient::run_ws(
291                ws,
292                subscribe_receiver,
293                request_receiver,
294                shutdown_receiver,
295            )),
296        })
297    }
298
299    pub async fn shutdown(self) -> PubsubClientResult {
300        let _ = self.shutdown_sender.send(());
301        self.ws.await.unwrap() // WS future should not be cancelled or panicked
302    }
303
304    #[deprecated(since = "2.0.2", note = "PubsubClient::node_version is no longer used")]
305    pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
306        Ok(())
307    }
308
309    async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
310    where
311        T: DeserializeOwned + Send + 'a,
312    {
313        let (response_sender, response_receiver) = oneshot::channel();
314        self.subscribe_sender
315            .send((operation.to_string(), params, response_sender))
316            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
317
318        let (notifications, unsubscribe) = response_receiver
319            .await
320            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
321        Ok((
322            UnboundedReceiverStream::new(notifications)
323                .filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
324                .boxed(),
325            unsubscribe,
326        ))
327    }
328
329    /// Subscribe to account events.
330    ///
331    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
332    ///
333    /// # RPC Reference
334    ///
335    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
336    ///
337    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket#accountsubscribe
338    pub async fn account_subscribe(
339        &self,
340        pubkey: &Pubkey,
341        config: Option<RpcAccountInfoConfig>,
342    ) -> SubscribeResult<'_, RpcResponse<UiAccount>> {
343        let params = json!([pubkey.to_string(), config]);
344        self.subscribe("account", params).await
345    }
346
347    /// Subscribe to block events.
348    ///
349    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
350    ///
351    /// This method is disabled by default. It can be enabled by passing
352    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
353    ///
354    /// # RPC Reference
355    ///
356    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
357    ///
358    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket#blocksubscribe
359    pub async fn block_subscribe(
360        &self,
361        filter: RpcBlockSubscribeFilter,
362        config: Option<RpcBlockSubscribeConfig>,
363    ) -> SubscribeResult<'_, RpcResponse<RpcBlockUpdate>> {
364        self.subscribe("block", json!([filter, config])).await
365    }
366
367    /// Subscribe to transaction log events.
368    ///
369    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
370    ///
371    /// # RPC Reference
372    ///
373    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
374    ///
375    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket#logssubscribe
376    pub async fn logs_subscribe(
377        &self,
378        filter: RpcTransactionLogsFilter,
379        config: RpcTransactionLogsConfig,
380    ) -> SubscribeResult<'_, RpcResponse<RpcLogsResponse>> {
381        self.subscribe("logs", json!([filter, config])).await
382    }
383
384    /// Subscribe to program account events.
385    ///
386    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
387    /// by the given program changes.
388    ///
389    /// # RPC Reference
390    ///
391    /// This method corresponds directly to the [`programSubscribe`] RPC method.
392    ///
393    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket#programsubscribe
394    pub async fn program_subscribe(
395        &self,
396        pubkey: &Pubkey,
397        config: Option<RpcProgramAccountsConfig>,
398    ) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
399        let params = json!([pubkey.to_string(), config]);
400        self.subscribe("program", params).await
401    }
402
403    /// Subscribe to vote events.
404    ///
405    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
406    /// votes are observed prior to confirmation and may never be confirmed.
407    ///
408    /// This method is disabled by default. It can be enabled by passing
409    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
410    ///
411    /// # RPC Reference
412    ///
413    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
414    ///
415    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket#votesubscribe
416    pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
417        self.subscribe("vote", json!([])).await
418    }
419
420    /// Subscribe to root events.
421    ///
422    /// Receives messages of type [`Slot`] when a new [root] is set by the
423    /// validator.
424    ///
425    /// [root]: https://solana.com/docs/terminology#root
426    ///
427    /// # RPC Reference
428    ///
429    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
430    ///
431    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket#rootsubscribe
432    pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
433        self.subscribe("root", json!([])).await
434    }
435
436    /// Subscribe to transaction confirmation events.
437    ///
438    /// Receives messages of type [`RpcSignatureResult`] when a transaction
439    /// with the given signature is committed.
440    ///
441    /// This is a subscription to a single notification. It is automatically
442    /// cancelled by the server once the notification is sent.
443    ///
444    /// # RPC Reference
445    ///
446    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
447    ///
448    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket#signaturesubscribe
449    pub async fn signature_subscribe(
450        &self,
451        signature: &Signature,
452        config: Option<RpcSignatureSubscribeConfig>,
453    ) -> SubscribeResult<'_, RpcResponse<RpcSignatureResult>> {
454        let params = json!([signature.to_string(), config]);
455        self.subscribe("signature", params).await
456    }
457
458    /// Subscribe to slot events.
459    ///
460    /// Receives messages of type [`SlotInfo`] when a slot is processed.
461    ///
462    /// # RPC Reference
463    ///
464    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
465    ///
466    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket#slotsubscribe
467    pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
468        self.subscribe("slot", json!([])).await
469    }
470
471    /// Subscribe to slot update events.
472    ///
473    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
474    ///
475    /// Note that this method operates differently than other subscriptions:
476    /// instead of sending the message to a receiver on a channel, it accepts a
477    /// `handler` callback that processes the message directly. This processing
478    /// occurs on another thread.
479    ///
480    /// # RPC Reference
481    ///
482    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
483    ///
484    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket#slotsupdatessubscribe
485    pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
486        self.subscribe("slotsUpdates", json!([])).await
487    }
488
489    async fn run_ws(
490        mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
491        mut subscribe_receiver: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
492        mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
493        mut shutdown_receiver: oneshot::Receiver<()>,
494    ) -> PubsubClientResult {
495        let mut request_id: u64 = 0;
496
497        let mut requests_subscribe = BTreeMap::new();
498        let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
499        let mut other_requests = BTreeMap::new();
500        let mut subscriptions = BTreeMap::new();
501        let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();
502
503        loop {
504            tokio::select! {
505                // Send close on shutdown signal
506                _ = (&mut shutdown_receiver) => {
507                    let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
508                    ws.send(Message::Close(Some(frame))).await?;
509                    ws.flush().await?;
510                    break;
511                },
512                // Send `Message::Ping` each 10s if no any other communication
513                () = sleep(Duration::from_secs(10)) => {
514                    ws.send(Message::Ping(Vec::new())).await?;
515                },
516                // Read message for subscribe
517                Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
518                    request_id += 1;
519                    let method = format!("{operation}Subscribe");
520                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
521                    ws.send(Message::Text(text)).await?;
522                    requests_subscribe.insert(request_id, (operation, response_sender));
523                },
524                // Read message for unsubscribe
525                Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => {
526                    subscriptions.remove(&sid);
527                    request_id += 1;
528                    let method = format!("{operation}Unsubscribe");
529                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
530                    ws.send(Message::Text(text)).await?;
531                    requests_unsubscribe.insert(request_id, response_sender);
532                },
533                // Read message for other requests
534                Some((method, params, response_sender)) = request_receiver.recv() => {
535                    request_id += 1;
536                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
537                    ws.send(Message::Text(text)).await?;
538                    other_requests.insert(request_id, response_sender);
539                }
540                // Read incoming WebSocket message
541                next_msg = ws.next() => {
542                    let msg = match next_msg {
543                        Some(msg) => msg?,
544                        None => break,
545                    };
546                    trace!("ws.next(): {:?}", &msg);
547
548                    // Get text from the message
549                    let text = match msg {
550                        Message::Text(text) => text,
551                        Message::Binary(_data) => continue, // Ignore
552                        Message::Ping(data) => {
553                            ws.send(Message::Pong(data)).await?;
554                            continue
555                        },
556                        Message::Pong(_data) => continue,
557                        Message::Close(_frame) => break,
558                        Message::Frame(_frame) => continue,
559                    };
560
561
562                    let mut json: Map<String, Value> = serde_json::from_str(&text)?;
563
564                    // Subscribe/Unsubscribe response, example:
565                    // `{"jsonrpc":"2.0","result":5308752,"id":1}`
566                    if let Some(id) = json.get("id") {
567                        let id = id.as_u64().ok_or_else(|| {
568                            PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.clone() }
569                        })?;
570
571                        let err = json.get("error").map(|error_object| {
572                            match serde_json::from_value::<RpcErrorObject>(error_object.clone()) {
573                                Ok(rpc_error_object) => {
574                                    format!("{} ({})",  rpc_error_object.message, rpc_error_object.code)
575                                }
576                                Err(err) => format!(
577                                    "Failed to deserialize RPC error response: {} [{}]",
578                                    serde_json::to_string(error_object).unwrap(),
579                                    err
580                                )
581                            }
582                        });
583
584                        if let Some(response_sender) = other_requests.remove(&id) {
585                            match err {
586                                Some(reason) => {
587                                    let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.clone()}));
588                                },
589                                None => {
590                                    let json_result = json.get("result").ok_or_else(|| {
591                                        PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.clone() }
592                                    })?;
593                                    if response_sender.send(Ok(json_result.clone())).is_err() {
594                                        break;
595                                    }
596                                }
597                            }
598                        } else if let Some(response_sender) = requests_unsubscribe.remove(&id) {
599                            let _ = response_sender.send(()); // do not care if receiver is closed
600                        } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) {
601                            match err {
602                                Some(reason) => {
603                                    let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()}));
604                                },
605                                None => {
606                                    // Subscribe Id
607                                    let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
608                                        PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.clone() }
609                                    })?;
610
611                                    // Create notifications channel and unsubscribe function
612                                    let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel();
613                                    let unsubscribe_sender = unsubscribe_sender.clone();
614                                    let unsubscribe = Box::new(move || async move {
615                                        let (response_sender, response_receiver) = oneshot::channel();
616                                        // do nothing if ws already closed
617                                        if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() {
618                                            let _ = response_receiver.await; // channel can be closed only if ws is closed
619                                        }
620                                    }.boxed());
621
622                                    if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() {
623                                        break;
624                                    }
625                                    subscriptions.insert(sid, notifications_sender);
626                                }
627                            }
628                        } else {
629                            error!("Unknown request id: {}", id);
630                            break;
631                        }
632                        continue;
633                    }
634
635                    // Notification, example:
636                    // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}`
637                    if let Some(Value::Object(params)) = json.get_mut("params") {
638                        if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
639                            let mut unsubscribe_required = false;
640
641                            if let Some(notifications_sender) = subscriptions.get(&sid) {
642                                if let Some(result) = params.remove("result") {
643                                    if notifications_sender.send(result).is_err() {
644                                        unsubscribe_required = true;
645                                    }
646                                }
647                            } else {
648                                unsubscribe_required = true;
649                            }
650
651                            if unsubscribe_required {
652                                if let Some(Value::String(method)) = json.remove("method") {
653                                    if let Some(operation) = method.strip_suffix("Notification") {
654                                        let (response_sender, _response_receiver) = oneshot::channel();
655                                        let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender));
656                                    }
657                                }
658                            }
659                        }
660                    }
661                }
662            }
663        }
664
665        Ok(())
666    }
667}
668
669#[cfg(test)]
670mod tests {
671    // see client-test/test/client.rs
672}