solana_client/nonblocking/
tpu_client.rs

1pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError};
2use {
3    crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
4    solana_connection_cache::connection_cache::{
5        ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
6        NewConnectionConfig,
7    },
8    solana_message::Message,
9    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
10    solana_rpc_client::nonblocking::rpc_client::RpcClient,
11    solana_signer::signers::Signers,
12    solana_tpu_client::nonblocking::tpu_client::{Result, TpuClient as BackendTpuClient},
13    solana_transaction::Transaction,
14    solana_transaction_error::{TransactionError, TransportResult},
15    std::sync::Arc,
16};
17
18/// Client which sends transactions directly to the current leader's TPU port over UDP.
19/// The client uses RPC to determine the current leader and fetch node contact info
20pub struct TpuClient<
21    P, // ConnectionPool
22    M, // ConnectionManager
23    C, // NewConnectionConfig
24> {
25    tpu_client: BackendTpuClient<P, M, C>,
26}
27
28impl<P, M, C> TpuClient<P, M, C>
29where
30    P: ConnectionPool<NewConnectionConfig = C>,
31    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
32    C: NewConnectionConfig,
33{
34    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
35    /// size
36    pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
37        self.tpu_client.send_transaction(transaction).await
38    }
39
40    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
41    pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
42        self.tpu_client
43            .send_wire_transaction(wire_transaction)
44            .await
45    }
46
47    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
48    /// size
49    /// Returns the last error if all sends fail
50    pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
51        self.tpu_client.try_send_transaction(transaction).await
52    }
53
54    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
55    /// Returns the last error if all sends fail
56    pub async fn try_send_wire_transaction(
57        &self,
58        wire_transaction: Vec<u8>,
59    ) -> TransportResult<()> {
60        self.tpu_client
61            .try_send_wire_transaction(wire_transaction)
62            .await
63    }
64
65    /// Send a batch of wire transactions to the current and upcoming leader TPUs according to
66    /// fanout size
67    /// Returns the last error if all sends fail
68    pub async fn try_send_wire_transaction_batch(
69        &self,
70        wire_transactions: Vec<Vec<u8>>,
71    ) -> TransportResult<()> {
72        self.tpu_client
73            .try_send_wire_transaction_batch(wire_transactions)
74            .await
75    }
76}
77
78impl TpuClient<QuicPool, QuicConnectionManager, QuicConfig> {
79    /// Create a new client that disconnects when dropped
80    pub async fn new(
81        name: &'static str,
82        rpc_client: Arc<RpcClient>,
83        websocket_url: &str,
84        config: TpuClientConfig,
85    ) -> Result<Self> {
86        let connection_cache = match ConnectionCache::new(name) {
87            ConnectionCache::Quic(cache) => cache,
88            ConnectionCache::Udp(_) => {
89                return Err(TpuSenderError::Custom(String::from(
90                    "Invalid default connection cache",
91                )))
92            }
93        };
94        Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
95    }
96}
97
98impl<P, M, C> TpuClient<P, M, C>
99where
100    P: ConnectionPool<NewConnectionConfig = C>,
101    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
102    C: NewConnectionConfig,
103{
104    /// Create a new client that disconnects when dropped
105    pub async fn new_with_connection_cache(
106        rpc_client: Arc<RpcClient>,
107        websocket_url: &str,
108        config: TpuClientConfig,
109        connection_cache: Arc<BackendConnectionCache<P, M, C>>,
110    ) -> Result<Self> {
111        Ok(Self {
112            tpu_client: BackendTpuClient::new_with_connection_cache(
113                rpc_client,
114                websocket_url,
115                config,
116                connection_cache,
117            )
118            .await?,
119        })
120    }
121
122    pub async fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
123        &self,
124        messages: &[Message],
125        signers: &T,
126    ) -> Result<Vec<Option<TransactionError>>> {
127        self.tpu_client
128            .send_and_confirm_messages_with_spinner(messages, signers)
129            .await
130    }
131
132    pub fn rpc_client(&self) -> &RpcClient {
133        self.tpu_client.rpc_client()
134    }
135
136    pub async fn shutdown(&mut self) {
137        self.tpu_client.shutdown().await
138    }
139}