solana_client/
tpu_client.rs

1use {
2    crate::connection_cache::ConnectionCache,
3    solana_connection_cache::connection_cache::{
4        ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
5        NewConnectionConfig,
6    },
7    solana_message::Message,
8    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
9    solana_rpc_client::rpc_client::RpcClient,
10    solana_signer::signers::Signers,
11    solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
12    solana_transaction::Transaction,
13    solana_transaction_error::{TransactionError, TransportResult},
14    solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
15    std::sync::Arc,
16};
17pub use {
18    crate::nonblocking::tpu_client::TpuSenderError,
19    solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
20};
21
22pub enum TpuClientWrapper {
23    Quic(BackendTpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
24    Udp(BackendTpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
25}
26
27/// Client which sends transactions directly to the current leader's TPU port over UDP.
28/// The client uses RPC to determine the current leader and fetch node contact info
29/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
30pub struct TpuClient<
31    P, // ConnectionPool
32    M, // ConnectionManager
33    C, // NewConnectionConfig
34> {
35    tpu_client: BackendTpuClient<P, M, C>,
36}
37
38impl<P, M, C> TpuClient<P, M, C>
39where
40    P: ConnectionPool<NewConnectionConfig = C>,
41    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
42    C: NewConnectionConfig,
43{
44    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
45    /// size
46    pub fn send_transaction(&self, transaction: &Transaction) -> bool {
47        self.tpu_client.send_transaction(transaction)
48    }
49
50    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
51    pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
52        self.tpu_client.send_wire_transaction(wire_transaction)
53    }
54
55    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
56    /// size
57    /// Returns the last error if all sends fail
58    pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
59        self.tpu_client.try_send_transaction(transaction)
60    }
61
62    /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
63    /// to fanout size
64    /// Returns the last error if all sends fail
65    pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
66        self.tpu_client.try_send_transaction_batch(transactions)
67    }
68
69    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
70    /// Returns the last error if all sends fail
71    pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
72        self.tpu_client.try_send_wire_transaction(wire_transaction)
73    }
74}
75
76impl TpuClient<QuicPool, QuicConnectionManager, QuicConfig> {
77    /// Create a new client that disconnects when dropped
78    pub fn new(
79        rpc_client: Arc<RpcClient>,
80        websocket_url: &str,
81        config: TpuClientConfig,
82    ) -> Result<Self> {
83        let connection_cache = match ConnectionCache::new("connection_cache_tpu_client") {
84            ConnectionCache::Quic(cache) => cache,
85            ConnectionCache::Udp(_) => {
86                return Err(TpuSenderError::Custom(String::from(
87                    "Invalid default connection cache",
88                )))
89            }
90        };
91        Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
92    }
93}
94
95impl<P, M, C> TpuClient<P, M, C>
96where
97    P: ConnectionPool<NewConnectionConfig = C>,
98    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
99    C: NewConnectionConfig,
100{
101    /// Create a new client that disconnects when dropped
102    pub fn new_with_connection_cache(
103        rpc_client: Arc<RpcClient>,
104        websocket_url: &str,
105        config: TpuClientConfig,
106        connection_cache: Arc<BackendConnectionCache<P, M, C>>,
107    ) -> Result<Self> {
108        Ok(Self {
109            tpu_client: BackendTpuClient::new_with_connection_cache(
110                rpc_client,
111                websocket_url,
112                config,
113                connection_cache,
114            )?,
115        })
116    }
117
118    pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
119        &self,
120        messages: &[Message],
121        signers: &T,
122    ) -> Result<Vec<Option<TransactionError>>> {
123        self.tpu_client
124            .send_and_confirm_messages_with_spinner(messages, signers)
125    }
126
127    pub fn rpc_client(&self) -> &RpcClient {
128        self.tpu_client.rpc_client()
129    }
130}