solana_pubsub_client::nonblocking

Module pubsub_client

Source
Expand description

A client for subscribing to messages from the RPC server.

The PubsubClient implements Solana WebSocket event subscriptions.

This is a nonblocking (async) API. For a blocking API use the synchronous client in crate::pubsub_client.

A single PubsubClient client may be used to subscribe to many events via subscription methods like PubsubClient::account_subscribe. These methods return a PubsubClientResult of a pair, the first element being a BoxStream of subscription-specific RpcResponses, the second being an unsubscribe closure, an asynchronous function that can be called and awaited to unsubscribe.

Note that BoxStream contains an immutable reference to the PubsubClient that created it. This makes BoxStream not Send, forcing it to stay in the same task as its PubsubClient. PubsubClient though is Send and Sync, and can be shared between tasks by putting it in an Arc. Thus one viable pattern to creating multiple subscriptions is:

  • create an Arc<PubsubClient>
  • spawn one task for each subscription, sharing the PubsubClient.
  • in each task:
    • create a subscription
    • send the UnsubscribeFn to another task to handle shutdown
    • loop while receiving messages from the subscription

This pattern is illustrated in the example below.

By default the block_subscribe and vote_subscribe events are disabled on RPC nodes. They can be enabled by passing --rpc-pubsub-enable-block-subscription and --rpc-pubsub-enable-vote-subscription to agave-validator. When these methods are disabled, the RPC server will return a “Method not found” error message.

§Examples

Demo two async PubsubClient subscriptions with clean shutdown.

This spawns a task for each subscription type, each of which subscribes and sends back a ready message and an unsubscribe channel (closure), then loops on printing messages. The main task then waits for user input before unsubscribing and waiting on the tasks.

use anyhow::Result;
use futures_util::StreamExt;
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::unbounded_channel;

pub async fn watch_subscriptions(
    websocket_url: &str,
) -> Result<()> {

    // Subscription tasks will send a ready signal when they have subscribed.
    let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();

    // Channel to receive unsubscribe channels (actually closures).
    // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
    // where the first is a closure to call to unsubscribe, the second is the subscription name.
    let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();

    // The `PubsubClient` must be `Arc`ed to share it across tasks.
    let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);

    let mut join_handles = vec![];

    join_handles.push(("slot", tokio::spawn({
        // Clone things we need before moving their clones into the `async move` block.
        //
        // The subscriptions have to be made from the tasks that will receive the subscription messages,
        // because the subscription streams hold a reference to the `PubsubClient`.
        // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.

        let ready_sender = ready_sender.clone();
        let unsubscribe_sender = unsubscribe_sender.clone();
        let pubsub_client = Arc::clone(&pubsub_client);
        async move {
            let (mut slot_notifications, slot_unsubscribe) =
                pubsub_client.slot_subscribe().await?;

            // With the subscription started,
            // send a signal back to the main task for synchronization.
            ready_sender.send(()).expect("channel");

            // Send the unsubscribe closure back to the main task.
            unsubscribe_sender.send((slot_unsubscribe, "slot"))
                .map_err(|e| format!("{}", e)).expect("channel");

            // Drop senders so that the channels can close.
            // The main task will receive until channels are closed.
            drop((ready_sender, unsubscribe_sender));

            // Do something with the subscribed messages.
            // This loop will end once the main task unsubscribes.
            while let Some(slot_info) = slot_notifications.next().await {
                println!("------------------------------------------------------------");
                println!("slot pubsub result: {:?}", slot_info);
            }

            // This type hint is necessary to allow the `async move` block to use `?`.
            Ok::<_, anyhow::Error>(())
        }
    })));

    join_handles.push(("root", tokio::spawn({
        let ready_sender = ready_sender.clone();
        let unsubscribe_sender = unsubscribe_sender.clone();
        let pubsub_client = Arc::clone(&pubsub_client);
        async move {
            let (mut root_notifications, root_unsubscribe) =
                pubsub_client.root_subscribe().await?;

            ready_sender.send(()).expect("channel");
            unsubscribe_sender.send((root_unsubscribe, "root"))
                .map_err(|e| format!("{}", e)).expect("channel");
            drop((ready_sender, unsubscribe_sender));

            while let Some(root) = root_notifications.next().await {
                println!("------------------------------------------------------------");
                println!("root pubsub result: {:?}", root);
            }

            Ok::<_, anyhow::Error>(())
        }
    })));

    // Drop these senders so that the channels can close
    // and their receivers return `None` below.
    drop(ready_sender);
    drop(unsubscribe_sender);

    // Wait until all subscribers are ready before proceeding with application logic.
    while let Some(_) = ready_receiver.recv().await { }

    // Do application logic here.

    // Wait for input or some application-specific shutdown condition.
    tokio::io::stdin().read_u8().await?;

    // Unsubscribe from everything, which will shutdown all the tasks.
    while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
        println!("unsubscribing from {}", name);
        unsubscribe().await
    }

    // Wait for the tasks.
    for (name, handle) in join_handles {
        println!("waiting on task {}", name);
        if let Ok(Err(e)) = handle.await {
            println!("task {} failed: {}", name, e);
        }
    }

    Ok(())
}

Structs§

  • A client for subscribing to messages from the RPC server.

Enums§

Type Aliases§