solana_client/
tpu_client.rs

1use {
2    crate::connection_cache::ConnectionCache,
3    solana_connection_cache::connection_cache::{
4        ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
5        NewConnectionConfig,
6    },
7    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
8    solana_rpc_client::rpc_client::RpcClient,
9    solana_sdk::{
10        message::Message,
11        signers::Signers,
12        transaction::{Transaction, TransactionError},
13        transport::Result as TransportResult,
14    },
15    solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
16    solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
17    std::sync::Arc,
18};
19pub use {
20    crate::nonblocking::tpu_client::TpuSenderError,
21    solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
22};
23
24pub enum TpuClientWrapper {
25    Quic(BackendTpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
26    Udp(BackendTpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
27}
28
29/// Client which sends transactions directly to the current leader's TPU port over UDP.
30/// The client uses RPC to determine the current leader and fetch node contact info
31/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
32pub struct TpuClient<
33    P, // ConnectionPool
34    M, // ConnectionManager
35    C, // NewConnectionConfig
36> {
37    tpu_client: BackendTpuClient<P, M, C>,
38}
39
40impl<P, M, C> TpuClient<P, M, C>
41where
42    P: ConnectionPool<NewConnectionConfig = C>,
43    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
44    C: NewConnectionConfig,
45{
46    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
47    /// size
48    pub fn send_transaction(&self, transaction: &Transaction) -> bool {
49        self.tpu_client.send_transaction(transaction)
50    }
51
52    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
53    pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
54        self.tpu_client.send_wire_transaction(wire_transaction)
55    }
56
57    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
58    /// size
59    /// Returns the last error if all sends fail
60    pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
61        self.tpu_client.try_send_transaction(transaction)
62    }
63
64    /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
65    /// to fanout size
66    /// Returns the last error if all sends fail
67    pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
68        self.tpu_client.try_send_transaction_batch(transactions)
69    }
70
71    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
72    /// Returns the last error if all sends fail
73    pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
74        self.tpu_client.try_send_wire_transaction(wire_transaction)
75    }
76}
77
78impl TpuClient<QuicPool, QuicConnectionManager, QuicConfig> {
79    /// Create a new client that disconnects when dropped
80    pub fn new(
81        rpc_client: Arc<RpcClient>,
82        websocket_url: &str,
83        config: TpuClientConfig,
84    ) -> Result<Self> {
85        let connection_cache = match ConnectionCache::new("connection_cache_tpu_client") {
86            ConnectionCache::Quic(cache) => cache,
87            ConnectionCache::Udp(_) => {
88                return Err(TpuSenderError::Custom(String::from(
89                    "Invalid default connection cache",
90                )))
91            }
92        };
93        Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
94    }
95}
96
97impl<P, M, C> TpuClient<P, M, C>
98where
99    P: ConnectionPool<NewConnectionConfig = C>,
100    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
101    C: NewConnectionConfig,
102{
103    /// Create a new client that disconnects when dropped
104    pub fn new_with_connection_cache(
105        rpc_client: Arc<RpcClient>,
106        websocket_url: &str,
107        config: TpuClientConfig,
108        connection_cache: Arc<BackendConnectionCache<P, M, C>>,
109    ) -> Result<Self> {
110        Ok(Self {
111            tpu_client: BackendTpuClient::new_with_connection_cache(
112                rpc_client,
113                websocket_url,
114                config,
115                connection_cache,
116            )?,
117        })
118    }
119
120    pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
121        &self,
122        messages: &[Message],
123        signers: &T,
124    ) -> Result<Vec<Option<TransactionError>>> {
125        self.tpu_client
126            .send_and_confirm_messages_with_spinner(messages, signers)
127    }
128
129    pub fn rpc_client(&self) -> &RpcClient {
130        self.tpu_client.rpc_client()
131    }
132}