solana_client/nonblocking/
tpu_client.rs

1pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError};
2use {
3    crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
4    solana_connection_cache::connection_cache::{
5        ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
6        NewConnectionConfig,
7    },
8    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
9    solana_rpc_client::nonblocking::rpc_client::RpcClient,
10    solana_sdk::{
11        message::Message,
12        signers::Signers,
13        transaction::{Transaction, TransactionError},
14        transport::Result as TransportResult,
15    },
16    solana_tpu_client::nonblocking::tpu_client::{Result, TpuClient as BackendTpuClient},
17    std::sync::Arc,
18};
19
20/// Client which sends transactions directly to the current leader's TPU port over UDP.
21/// The client uses RPC to determine the current leader and fetch node contact info
22pub struct TpuClient<
23    P, // ConnectionPool
24    M, // ConnectionManager
25    C, // NewConnectionConfig
26> {
27    tpu_client: BackendTpuClient<P, M, C>,
28}
29
30impl<P, M, C> TpuClient<P, M, C>
31where
32    P: ConnectionPool<NewConnectionConfig = C>,
33    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
34    C: NewConnectionConfig,
35{
36    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
37    /// size
38    pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
39        self.tpu_client.send_transaction(transaction).await
40    }
41
42    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
43    pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
44        self.tpu_client
45            .send_wire_transaction(wire_transaction)
46            .await
47    }
48
49    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
50    /// size
51    /// Returns the last error if all sends fail
52    pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
53        self.tpu_client.try_send_transaction(transaction).await
54    }
55
56    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
57    /// Returns the last error if all sends fail
58    pub async fn try_send_wire_transaction(
59        &self,
60        wire_transaction: Vec<u8>,
61    ) -> TransportResult<()> {
62        self.tpu_client
63            .try_send_wire_transaction(wire_transaction)
64            .await
65    }
66
67    /// Send a batch of wire transactions to the current and upcoming leader TPUs according to
68    /// fanout size
69    /// Returns the last error if all sends fail
70    pub async fn try_send_wire_transaction_batch(
71        &self,
72        wire_transactions: Vec<Vec<u8>>,
73    ) -> TransportResult<()> {
74        self.tpu_client
75            .try_send_wire_transaction_batch(wire_transactions)
76            .await
77    }
78}
79
80impl TpuClient<QuicPool, QuicConnectionManager, QuicConfig> {
81    /// Create a new client that disconnects when dropped
82    pub async fn new(
83        name: &'static str,
84        rpc_client: Arc<RpcClient>,
85        websocket_url: &str,
86        config: TpuClientConfig,
87    ) -> Result<Self> {
88        let connection_cache = match ConnectionCache::new(name) {
89            ConnectionCache::Quic(cache) => cache,
90            ConnectionCache::Udp(_) => {
91                return Err(TpuSenderError::Custom(String::from(
92                    "Invalid default connection cache",
93                )))
94            }
95        };
96        Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
97    }
98}
99
100impl<P, M, C> TpuClient<P, M, C>
101where
102    P: ConnectionPool<NewConnectionConfig = C>,
103    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
104    C: NewConnectionConfig,
105{
106    /// Create a new client that disconnects when dropped
107    pub async fn new_with_connection_cache(
108        rpc_client: Arc<RpcClient>,
109        websocket_url: &str,
110        config: TpuClientConfig,
111        connection_cache: Arc<BackendConnectionCache<P, M, C>>,
112    ) -> Result<Self> {
113        Ok(Self {
114            tpu_client: BackendTpuClient::new_with_connection_cache(
115                rpc_client,
116                websocket_url,
117                config,
118                connection_cache,
119            )
120            .await?,
121        })
122    }
123
124    pub async fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
125        &self,
126        messages: &[Message],
127        signers: &T,
128    ) -> Result<Vec<Option<TransactionError>>> {
129        self.tpu_client
130            .send_and_confirm_messages_with_spinner(messages, signers)
131            .await
132    }
133
134    pub fn rpc_client(&self) -> &RpcClient {
135        self.tpu_client.rpc_client()
136    }
137
138    pub async fn shutdown(&mut self) {
139        self.tpu_client.shutdown().await
140    }
141}