1use {
2 crate::{
3 client_error::ClientError,
4 connection_cache::ConnectionCache,
5 nonblocking::{
6 pubsub_client::{PubsubClient, PubsubClientError},
7 rpc_client::RpcClient,
8 tpu_connection::TpuConnection,
9 },
10 rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
11 rpc_response::SlotUpdate,
12 spinner,
13 tpu_client::{
14 LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig,
15 MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL,
16 },
17 },
18 bincode::serialize,
19 futures_util::{future::join_all, stream::StreamExt},
20 log::*,
21 solana_sdk::{
22 clock::Slot,
23 commitment_config::CommitmentConfig,
24 message::Message,
25 signature::SignerError,
26 signers::Signers,
27 transaction::{Transaction, TransactionError},
28 transport::{Result as TransportResult, TransportError},
29 },
30 std::{
31 collections::HashMap,
32 net::SocketAddr,
33 sync::{
34 atomic::{AtomicBool, Ordering},
35 Arc, RwLock,
36 },
37 },
38 thiserror::Error,
39 tokio::{
40 task::JoinHandle,
41 time::{sleep, timeout, Duration, Instant},
42 },
43};
44
45#[derive(Error, Debug)]
46pub enum TpuSenderError {
47 #[error("Pubsub error: {0:?}")]
48 PubsubError(#[from] PubsubClientError),
49 #[error("RPC error: {0:?}")]
50 RpcError(#[from] ClientError),
51 #[error("IO error: {0:?}")]
52 IoError(#[from] std::io::Error),
53 #[error("Signer error: {0:?}")]
54 SignerError(#[from] SignerError),
55 #[error("Custom error: {0}")]
56 Custom(String),
57}
58
59type Result<T> = std::result::Result<T, TpuSenderError>;
60
61pub struct TpuClient {
64 fanout_slots: u64,
65 leader_tpu_service: LeaderTpuService,
66 exit: Arc<AtomicBool>,
67 rpc_client: Arc<RpcClient>,
68 connection_cache: Arc<ConnectionCache>,
69}
70
71async fn send_wire_transaction_to_addr(
72 connection_cache: &ConnectionCache,
73 addr: &SocketAddr,
74 wire_transaction: Vec<u8>,
75) -> TransportResult<()> {
76 let conn = connection_cache.get_nonblocking_connection(addr);
77 conn.send_wire_transaction(wire_transaction.clone()).await
78}
79
80async fn send_wire_transaction_batch_to_addr(
81 connection_cache: &ConnectionCache,
82 addr: &SocketAddr,
83 wire_transactions: &[Vec<u8>],
84) -> TransportResult<()> {
85 let conn = connection_cache.get_nonblocking_connection(addr);
86 conn.send_wire_transaction_batch(wire_transactions).await
87}
88
89impl TpuClient {
90 pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
93 let wire_transaction = serialize(transaction).expect("serialization should succeed");
94 self.send_wire_transaction(wire_transaction).await
95 }
96
97 pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
99 self.try_send_wire_transaction(wire_transaction)
100 .await
101 .is_ok()
102 }
103
104 pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
108 let wire_transaction = serialize(transaction).expect("serialization should succeed");
109 self.try_send_wire_transaction(wire_transaction).await
110 }
111
112 async fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
115 let leaders = self
116 .leader_tpu_service
117 .leader_tpu_sockets(self.fanout_slots);
118 let futures = leaders
119 .iter()
120 .map(|addr| {
121 send_wire_transaction_to_addr(
122 &self.connection_cache,
123 addr,
124 wire_transaction.clone(),
125 )
126 })
127 .collect::<Vec<_>>();
128 let results: Vec<TransportResult<()>> = join_all(futures).await;
129
130 let mut last_error: Option<TransportError> = None;
131 let mut some_success = false;
132 for result in results {
133 if let Err(e) = result {
134 if last_error.is_none() {
135 last_error = Some(e);
136 }
137 } else {
138 some_success = true;
139 }
140 }
141 if !some_success {
142 Err(if let Some(err) = last_error {
143 err
144 } else {
145 std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
146 })
147 } else {
148 Ok(())
149 }
150 }
151
152 pub async fn try_send_wire_transaction_batch(
156 &self,
157 wire_transactions: Vec<Vec<u8>>,
158 ) -> TransportResult<()> {
159 let leaders = self
160 .leader_tpu_service
161 .leader_tpu_sockets(self.fanout_slots);
162 let futures = leaders
163 .iter()
164 .map(|addr| {
165 send_wire_transaction_batch_to_addr(
166 &self.connection_cache,
167 addr,
168 &wire_transactions,
169 )
170 })
171 .collect::<Vec<_>>();
172 let results: Vec<TransportResult<()>> = join_all(futures).await;
173
174 let mut last_error: Option<TransportError> = None;
175 let mut some_success = false;
176 for result in results {
177 if let Err(e) = result {
178 if last_error.is_none() {
179 last_error = Some(e);
180 }
181 } else {
182 some_success = true;
183 }
184 }
185 if !some_success {
186 Err(if let Some(err) = last_error {
187 err
188 } else {
189 std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
190 })
191 } else {
192 Ok(())
193 }
194 }
195
196 pub async fn new(
198 rpc_client: Arc<RpcClient>,
199 websocket_url: &str,
200 config: TpuClientConfig,
201 ) -> Result<Self> {
202 let connection_cache = Arc::new(ConnectionCache::default());
203 Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
204 }
205
206 pub async fn new_with_connection_cache(
208 rpc_client: Arc<RpcClient>,
209 websocket_url: &str,
210 config: TpuClientConfig,
211 connection_cache: Arc<ConnectionCache>,
212 ) -> Result<Self> {
213 let exit = Arc::new(AtomicBool::new(false));
214 let leader_tpu_service =
215 LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?;
216
217 Ok(Self {
218 fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
219 leader_tpu_service,
220 exit,
221 rpc_client,
222 connection_cache,
223 })
224 }
225
226 pub async fn send_and_confirm_messages_with_spinner<T: Signers>(
227 &self,
228 messages: &[Message],
229 signers: &T,
230 ) -> Result<Vec<Option<TransactionError>>> {
231 let mut expired_blockhash_retries = 5;
232 let progress_bar = spinner::new_progress_bar();
233 progress_bar.set_message("Setting up...");
234
235 let mut transactions = messages
236 .iter()
237 .enumerate()
238 .map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
239 .collect::<Vec<_>>();
240 let total_transactions = transactions.len();
241 let mut transaction_errors = vec![None; transactions.len()];
242 let mut confirmed_transactions = 0;
243 let mut block_height = self.rpc_client.get_block_height().await?;
244 while expired_blockhash_retries > 0 {
245 let (blockhash, last_valid_block_height) = self
246 .rpc_client
247 .get_latest_blockhash_with_commitment(self.rpc_client.commitment())
248 .await?;
249
250 let mut pending_transactions = HashMap::new();
251 for (i, mut transaction) in transactions {
252 transaction.try_sign(signers, blockhash)?;
253 pending_transactions.insert(transaction.signatures[0], (i, transaction));
254 }
255
256 let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
257 while block_height <= last_valid_block_height {
258 let num_transactions = pending_transactions.len();
259
260 if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
262 for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
263 if !self.send_transaction(transaction).await {
264 let _result = self.rpc_client.send_transaction(transaction).await.ok();
265 }
266 spinner::set_message_for_confirmed_transactions(
267 &progress_bar,
268 confirmed_transactions,
269 total_transactions,
270 None, last_valid_block_height,
272 &format!("Sending {}/{} transactions", index + 1, num_transactions,),
273 );
274 sleep(SEND_TRANSACTION_INTERVAL).await;
275 }
276 last_resend = Instant::now();
277 }
278
279 let mut block_height_refreshes = 10;
281 spinner::set_message_for_confirmed_transactions(
282 &progress_bar,
283 confirmed_transactions,
284 total_transactions,
285 Some(block_height),
286 last_valid_block_height,
287 &format!(
288 "Waiting for next block, {} transactions pending...",
289 num_transactions
290 ),
291 );
292 let mut new_block_height = block_height;
293 while block_height == new_block_height && block_height_refreshes > 0 {
294 sleep(Duration::from_millis(500)).await;
295 new_block_height = self.rpc_client.get_block_height().await?;
296 block_height_refreshes -= 1;
297 }
298 block_height = new_block_height;
299
300 let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
302 for pending_signatures_chunk in
303 pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
304 {
305 if let Ok(result) = self
306 .rpc_client
307 .get_signature_statuses(pending_signatures_chunk)
308 .await
309 {
310 let statuses = result.value;
311 for (signature, status) in
312 pending_signatures_chunk.iter().zip(statuses.into_iter())
313 {
314 if let Some(status) = status {
315 if status.satisfies_commitment(self.rpc_client.commitment()) {
316 if let Some((i, _)) = pending_transactions.remove(signature) {
317 confirmed_transactions += 1;
318 if status.err.is_some() {
319 progress_bar.println(format!(
320 "Failed transaction: {:?}",
321 status
322 ));
323 }
324 transaction_errors[i] = status.err;
325 }
326 }
327 }
328 }
329 }
330 spinner::set_message_for_confirmed_transactions(
331 &progress_bar,
332 confirmed_transactions,
333 total_transactions,
334 Some(block_height),
335 last_valid_block_height,
336 "Checking transaction status...",
337 );
338 }
339
340 if pending_transactions.is_empty() {
341 return Ok(transaction_errors);
342 }
343 }
344
345 transactions = pending_transactions.into_values().collect();
346 progress_bar.println(format!(
347 "Blockhash expired. {} retries remaining",
348 expired_blockhash_retries
349 ));
350 expired_blockhash_retries -= 1;
351 }
352 Err(TpuSenderError::Custom("Max retries exceeded".into()))
353 }
354
355 pub fn rpc_client(&self) -> &RpcClient {
356 &self.rpc_client
357 }
358
359 pub async fn shutdown(&mut self) {
360 self.exit.store(true, Ordering::Relaxed);
361 self.leader_tpu_service.join().await;
362 }
363}
364impl Drop for TpuClient {
365 fn drop(&mut self) {
366 self.exit.store(true, Ordering::Relaxed);
367 }
368}
369
370pub struct LeaderTpuService {
373 recent_slots: RecentLeaderSlots,
374 leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
375 t_leader_tpu_service: Option<JoinHandle<Result<()>>>,
376}
377
378impl LeaderTpuService {
379 pub async fn new(
380 rpc_client: Arc<RpcClient>,
381 websocket_url: &str,
382 exit: Arc<AtomicBool>,
383 ) -> Result<Self> {
384 let start_slot = rpc_client
385 .get_slot_with_commitment(CommitmentConfig::processed())
386 .await?;
387
388 let recent_slots = RecentLeaderSlots::new(start_slot);
389 let slots_in_epoch = rpc_client.get_epoch_info().await?.slots_in_epoch;
390 let leaders = rpc_client
391 .get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))
392 .await?;
393 let cluster_nodes = rpc_client.get_cluster_nodes().await?;
394 let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
395 start_slot,
396 slots_in_epoch,
397 leaders,
398 cluster_nodes,
399 )));
400
401 let pubsub_client = if !websocket_url.is_empty() {
402 Some(PubsubClient::new(websocket_url).await?)
403 } else {
404 None
405 };
406
407 let t_leader_tpu_service = Some({
408 let recent_slots = recent_slots.clone();
409 let leader_tpu_cache = leader_tpu_cache.clone();
410 tokio::spawn(async move {
411 Self::run(
412 rpc_client,
413 recent_slots,
414 leader_tpu_cache,
415 pubsub_client,
416 exit,
417 )
418 .await
419 })
420 });
421
422 Ok(LeaderTpuService {
423 recent_slots,
424 leader_tpu_cache,
425 t_leader_tpu_service,
426 })
427 }
428
429 pub async fn join(&mut self) {
430 if let Some(t_handle) = self.t_leader_tpu_service.take() {
431 t_handle.await.unwrap().unwrap();
432 }
433 }
434
435 pub fn estimated_current_slot(&self) -> Slot {
436 self.recent_slots.estimated_current_slot()
437 }
438
439 fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
440 let current_slot = self.recent_slots.estimated_current_slot();
441 self.leader_tpu_cache
442 .read()
443 .unwrap()
444 .get_leader_sockets(current_slot, fanout_slots)
445 }
446
447 async fn run(
448 rpc_client: Arc<RpcClient>,
449 recent_slots: RecentLeaderSlots,
450 leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
451 pubsub_client: Option<PubsubClient>,
452 exit: Arc<AtomicBool>,
453 ) -> Result<()> {
454 let (mut notifications, unsubscribe) = if let Some(pubsub_client) = &pubsub_client {
455 let (notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?;
456 (Some(notifications), Some(unsubscribe))
457 } else {
458 (None, None)
459 };
460 let mut last_cluster_refresh = Instant::now();
461 let mut sleep_ms = 1000;
462 loop {
463 if exit.load(Ordering::Relaxed) {
464 if let Some(unsubscribe) = unsubscribe {
465 (unsubscribe)().await;
466 }
467 drop(notifications);
470 if let Some(pubsub_client) = pubsub_client {
471 pubsub_client.shutdown().await.unwrap();
472 };
473 break;
474 }
475
476 sleep(Duration::from_millis(sleep_ms)).await;
478 sleep_ms = 1000;
479
480 if let Some(notifications) = &mut notifications {
481 while let Ok(Some(update)) =
482 timeout(Duration::from_millis(10), notifications.next()).await
483 {
484 let current_slot = match update {
485 SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
488 SlotUpdate::FirstShredReceived { slot, .. } => slot,
491 _ => continue,
492 };
493 recent_slots.record_slot(current_slot);
494 }
495 }
496
497 let cache_update_info = maybe_fetch_cache_info(
498 &leader_tpu_cache,
499 last_cluster_refresh,
500 &rpc_client,
501 &recent_slots,
502 )
503 .await;
504
505 if cache_update_info.has_some() {
506 let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
507 let (has_error, cluster_refreshed) = leader_tpu_cache
508 .update_all(recent_slots.estimated_current_slot(), cache_update_info);
509 if has_error {
510 sleep_ms = 100;
511 }
512 if cluster_refreshed {
513 last_cluster_refresh = Instant::now();
514 }
515 }
516 }
517 Ok(())
518 }
519}
520
521async fn maybe_fetch_cache_info(
522 leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
523 last_cluster_refresh: Instant,
524 rpc_client: &RpcClient,
525 recent_slots: &RecentLeaderSlots,
526) -> LeaderTpuCacheUpdateInfo {
527 let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
530 Some(rpc_client.get_cluster_nodes().await)
531 } else {
532 None
533 };
534
535 let estimated_current_slot = recent_slots.estimated_current_slot();
536 let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
537 let leader_tpu_cache = leader_tpu_cache.read().unwrap();
538 leader_tpu_cache.slot_info()
539 };
540 let maybe_epoch_info =
541 if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
542 Some(rpc_client.get_epoch_info().await)
543 } else {
544 None
545 };
546
547 let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
548 {
549 Some(
550 rpc_client
551 .get_slot_leaders(
552 estimated_current_slot,
553 LeaderTpuCache::fanout(slots_in_epoch),
554 )
555 .await,
556 )
557 } else {
558 None
559 };
560 LeaderTpuCacheUpdateInfo {
561 maybe_cluster_nodes,
562 maybe_epoch_info,
563 maybe_slot_leaders,
564 }
565}