Expand description
A client for subscribing to messages from the RPC server.
The PubsubClient
implements Solana WebSocket event
subscriptions.
This is a nonblocking (async) API. For a blocking API use the synchronous
client in crate::pubsub_client
.
A single PubsubClient
client may be used to subscribe to many events via
subscription methods like PubsubClient::account_subscribe
. These methods
return a PubsubClientResult
of a pair, the first element being a
BoxStream
of subscription-specific RpcResponse
s, the second being an
unsubscribe closure, an asynchronous function that can be called and
await
ed to unsubscribe.
Note that BoxStream
contains an immutable reference to the PubsubClient
that created it. This makes BoxStream
not Send
, forcing it to stay in
the same task as its PubsubClient
. PubsubClient
though is Send
and
Sync
, and can be shared between tasks by putting it in an Arc
. Thus
one viable pattern to creating multiple subscriptions is:
- create an
Arc<PubsubClient>
- spawn one task for each subscription, sharing the
PubsubClient
. - in each task:
- create a subscription
- send the
UnsubscribeFn
to another task to handle shutdown - loop while receiving messages from the subscription
This pattern is illustrated in the example below.
By default the block_subscribe
and vote_subscribe
events are
disabled on RPC nodes. They can be enabled by passing
--rpc-pubsub-enable-block-subscription
and
--rpc-pubsub-enable-vote-subscription
to agave-validator
. When these
methods are disabled, the RPC server will return a “Method not found” error
message.
§Examples
Demo two async PubsubClient
subscriptions with clean shutdown.
This spawns a task for each subscription type, each of which subscribes and sends back a ready message and an unsubscribe channel (closure), then loops on printing messages. The main task then waits for user input before unsubscribing and waiting on the tasks.
use anyhow::Result;
use futures_util::StreamExt;
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::unbounded_channel;
pub async fn watch_subscriptions(
websocket_url: &str,
) -> Result<()> {
// Subscription tasks will send a ready signal when they have subscribed.
let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
// Channel to receive unsubscribe channels (actually closures).
// These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
// where the first is a closure to call to unsubscribe, the second is the subscription name.
let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
// The `PubsubClient` must be `Arc`ed to share it across tasks.
let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
let mut join_handles = vec![];
join_handles.push(("slot", tokio::spawn({
// Clone things we need before moving their clones into the `async move` block.
//
// The subscriptions have to be made from the tasks that will receive the subscription messages,
// because the subscription streams hold a reference to the `PubsubClient`.
// Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
let ready_sender = ready_sender.clone();
let unsubscribe_sender = unsubscribe_sender.clone();
let pubsub_client = Arc::clone(&pubsub_client);
async move {
let (mut slot_notifications, slot_unsubscribe) =
pubsub_client.slot_subscribe().await?;
// With the subscription started,
// send a signal back to the main task for synchronization.
ready_sender.send(()).expect("channel");
// Send the unsubscribe closure back to the main task.
unsubscribe_sender.send((slot_unsubscribe, "slot"))
.map_err(|e| format!("{}", e)).expect("channel");
// Drop senders so that the channels can close.
// The main task will receive until channels are closed.
drop((ready_sender, unsubscribe_sender));
// Do something with the subscribed messages.
// This loop will end once the main task unsubscribes.
while let Some(slot_info) = slot_notifications.next().await {
println!("------------------------------------------------------------");
println!("slot pubsub result: {:?}", slot_info);
}
// This type hint is necessary to allow the `async move` block to use `?`.
Ok::<_, anyhow::Error>(())
}
})));
join_handles.push(("root", tokio::spawn({
let ready_sender = ready_sender.clone();
let unsubscribe_sender = unsubscribe_sender.clone();
let pubsub_client = Arc::clone(&pubsub_client);
async move {
let (mut root_notifications, root_unsubscribe) =
pubsub_client.root_subscribe().await?;
ready_sender.send(()).expect("channel");
unsubscribe_sender.send((root_unsubscribe, "root"))
.map_err(|e| format!("{}", e)).expect("channel");
drop((ready_sender, unsubscribe_sender));
while let Some(root) = root_notifications.next().await {
println!("------------------------------------------------------------");
println!("root pubsub result: {:?}", root);
}
Ok::<_, anyhow::Error>(())
}
})));
// Drop these senders so that the channels can close
// and their receivers return `None` below.
drop(ready_sender);
drop(unsubscribe_sender);
// Wait until all subscribers are ready before proceeding with application logic.
while let Some(_) = ready_receiver.recv().await { }
// Do application logic here.
// Wait for input or some application-specific shutdown condition.
tokio::io::stdin().read_u8().await?;
// Unsubscribe from everything, which will shutdown all the tasks.
while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
println!("unsubscribing from {}", name);
unsubscribe().await
}
// Wait for the tasks.
for (name, handle) in join_handles {
println!("waiting on task {}", name);
if let Ok(Err(e)) = handle.await {
println!("task {} failed: {}", name, e);
}
}
Ok(())
}
Structs§
- A client for subscribing to messages from the RPC server.