1pub use crate::nonblocking::tpu_client::TpuSenderError;
2use {
3 crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
4 rayon::iter::{IntoParallelIterator, ParallelIterator},
5 solana_client_traits::AsyncClient,
6 solana_clock::Slot,
7 solana_connection_cache::{
8 client_connection::ClientConnection,
9 connection_cache::{
10 ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
11 },
12 },
13 solana_net_utils::bind_to_unspecified,
14 solana_rpc_client::rpc_client::RpcClient,
15 solana_signature::Signature,
16 solana_transaction::{versioned::VersionedTransaction, Transaction},
17 solana_transaction_error::TransportResult,
18 std::{
19 collections::VecDeque,
20 net::UdpSocket,
21 sync::{Arc, RwLock},
22 },
23};
24#[cfg(feature = "spinner")]
25use {
26 solana_message::Message, solana_signer::signers::Signers,
27 solana_transaction_error::TransactionError, tokio::time::Duration,
28};
29
30pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
31pub const DEFAULT_TPU_USE_QUIC: bool = true;
32pub const DEFAULT_VOTE_USE_QUIC: bool = false;
33
34pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 1;
38
39pub type Result<T> = std::result::Result<T, TpuSenderError>;
40
41#[cfg(feature = "spinner")]
43pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
44#[cfg(feature = "spinner")]
46pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
47
48pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
50
51pub const MAX_FANOUT_SLOTS: u64 = 100;
53
54#[derive(Clone, Debug)]
56pub struct TpuClientConfig {
57 pub fanout_slots: u64,
60}
61
62impl Default for TpuClientConfig {
63 fn default() -> Self {
64 Self {
65 fanout_slots: DEFAULT_FANOUT_SLOTS,
66 }
67 }
68}
69
70pub struct TpuClient<
73 P, M, C, > {
77 _deprecated: UdpSocket, rpc_client: Arc<RpcClient>,
80 tpu_client: Arc<NonblockingTpuClient<P, M, C>>,
81}
82
83impl<P, M, C> TpuClient<P, M, C>
84where
85 P: ConnectionPool<NewConnectionConfig = C>,
86 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
87 C: NewConnectionConfig,
88{
89 pub fn send_transaction(&self, transaction: &Transaction) -> bool {
92 self.invoke(self.tpu_client.send_transaction(transaction))
93 }
94
95 pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
97 self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
98 }
99
100 pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
104 self.invoke(self.tpu_client.try_send_transaction(transaction))
105 }
106
107 pub fn send_transaction_to_upcoming_leaders(
113 &self,
114 transaction: &Transaction,
115 ) -> TransportResult<()> {
116 let wire_transaction =
117 bincode::serialize(&transaction).expect("should serialize transaction");
118
119 let leaders = self
120 .tpu_client
121 .get_leader_tpu_service()
122 .unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots());
123
124 for tpu_address in &leaders {
125 let cache = self.tpu_client.get_connection_cache();
126 let conn = cache.get_connection(tpu_address);
127 conn.send_data_async(wire_transaction.clone())?;
128 }
129
130 Ok(())
131 }
132
133 pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
137 let wire_transactions = transactions
138 .into_par_iter()
139 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
140 .collect::<Vec<_>>();
141 self.invoke(
142 self.tpu_client
143 .try_send_wire_transaction_batch(wire_transactions),
144 )
145 }
146
147 pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
150 self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
151 }
152
153 pub fn try_send_wire_transaction_batch(
154 &self,
155 wire_transactions: Vec<Vec<u8>>,
156 ) -> TransportResult<()> {
157 self.invoke(
158 self.tpu_client
159 .try_send_wire_transaction_batch(wire_transactions),
160 )
161 }
162
163 pub fn new(
165 name: &'static str,
166 rpc_client: Arc<RpcClient>,
167 websocket_url: &str,
168 config: TpuClientConfig,
169 connection_manager: M,
170 ) -> Result<Self> {
171 let create_tpu_client = NonblockingTpuClient::new(
172 name,
173 rpc_client.get_inner_client().clone(),
174 websocket_url,
175 config,
176 connection_manager,
177 );
178 let tpu_client =
179 tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
180
181 Ok(Self {
182 _deprecated: bind_to_unspecified().unwrap(),
183 rpc_client,
184 tpu_client: Arc::new(tpu_client),
185 })
186 }
187
188 pub fn new_with_connection_cache(
190 rpc_client: Arc<RpcClient>,
191 websocket_url: &str,
192 config: TpuClientConfig,
193 connection_cache: Arc<ConnectionCache<P, M, C>>,
194 ) -> Result<Self> {
195 let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
196 rpc_client.get_inner_client().clone(),
197 websocket_url,
198 config,
199 connection_cache,
200 );
201 let tpu_client =
202 tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
203
204 Ok(Self {
205 _deprecated: bind_to_unspecified().unwrap(),
206 rpc_client,
207 tpu_client: Arc::new(tpu_client),
208 })
209 }
210
211 #[cfg(feature = "spinner")]
212 pub fn send_and_confirm_messages_with_spinner<T: Signers + ?Sized>(
213 &self,
214 messages: &[Message],
215 signers: &T,
216 ) -> Result<Vec<Option<TransactionError>>> {
217 self.invoke(
218 self.tpu_client
219 .send_and_confirm_messages_with_spinner(messages, signers),
220 )
221 }
222
223 pub fn rpc_client(&self) -> &RpcClient {
224 &self.rpc_client
225 }
226
227 fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
228 tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
232 }
233}
234
235impl<P, M, C> AsyncClient for TpuClient<P, M, C>
238where
239 P: ConnectionPool<NewConnectionConfig = C>,
240 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
241 C: NewConnectionConfig,
242{
243 fn async_send_versioned_transaction(
244 &self,
245 transaction: VersionedTransaction,
246 ) -> TransportResult<Signature> {
247 let wire_transaction =
248 bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
249 self.send_wire_transaction(wire_transaction);
250 Ok(transaction.signatures[0])
251 }
252
253 fn async_send_versioned_transaction_batch(
254 &self,
255 batch: Vec<VersionedTransaction>,
256 ) -> TransportResult<()> {
257 let buffers = batch
258 .into_par_iter()
259 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
260 .collect::<Vec<_>>();
261 self.try_send_wire_transaction_batch(buffers)?;
262 Ok(())
263 }
264}
265
266const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
268
269#[derive(Clone, Debug)]
270pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
271impl RecentLeaderSlots {
272 pub(crate) fn new(current_slot: Slot) -> Self {
273 let mut recent_slots = VecDeque::new();
274 recent_slots.push_back(current_slot);
275 Self(Arc::new(RwLock::new(recent_slots)))
276 }
277
278 pub(crate) fn record_slot(&self, current_slot: Slot) {
279 let mut recent_slots = self.0.write().unwrap();
280 recent_slots.push_back(current_slot);
281 while recent_slots.len() > 12 {
284 recent_slots.pop_front();
285 }
286 }
287
288 pub(crate) fn estimated_current_slot(&self) -> Slot {
290 let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
291 assert!(!recent_slots.is_empty());
292 recent_slots.sort_unstable();
293
294 let max_index = recent_slots.len() - 1;
297 let median_index = max_index / 2;
298 let median_recent_slot = recent_slots[median_index];
299 let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
300 let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
301
302 recent_slots
305 .into_iter()
306 .rev()
307 .find(|slot| *slot <= max_reasonable_current_slot)
308 .unwrap()
309 }
310}
311
312#[cfg(test)]
313impl From<Vec<Slot>> for RecentLeaderSlots {
314 fn from(recent_slots: Vec<Slot>) -> Self {
315 assert!(!recent_slots.is_empty());
316 Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
325 assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
326 }
327
328 #[test]
329 fn test_recent_leader_slots() {
330 assert_slot(RecentLeaderSlots::new(0), 0);
331
332 let mut recent_slots: Vec<Slot> = (1..=12).collect();
333 assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
334
335 recent_slots.reverse();
336 assert_slot(RecentLeaderSlots::from(recent_slots), 12);
337
338 assert_slot(
339 RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
340 1 + MAX_SLOT_SKIP_DISTANCE,
341 );
342 assert_slot(
343 RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
344 0,
345 );
346
347 assert_slot(RecentLeaderSlots::from(vec![1]), 1);
348 assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
349 assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
350 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
351 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
352 }
353}