safecoin_client/nonblocking/
tpu_client.rs

1use {
2    crate::{
3        client_error::ClientError,
4        connection_cache::ConnectionCache,
5        nonblocking::{
6            pubsub_client::{PubsubClient, PubsubClientError},
7            rpc_client::RpcClient,
8            tpu_connection::TpuConnection,
9        },
10        rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
11        rpc_response::SlotUpdate,
12        spinner,
13        tpu_client::{
14            LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig,
15            MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL,
16        },
17    },
18    bincode::serialize,
19    futures_util::{future::join_all, stream::StreamExt},
20    log::*,
21    solana_sdk::{
22        clock::Slot,
23        commitment_config::CommitmentConfig,
24        message::Message,
25        signature::SignerError,
26        signers::Signers,
27        transaction::{Transaction, TransactionError},
28        transport::{Result as TransportResult, TransportError},
29    },
30    std::{
31        collections::HashMap,
32        net::SocketAddr,
33        sync::{
34            atomic::{AtomicBool, Ordering},
35            Arc, RwLock,
36        },
37    },
38    thiserror::Error,
39    tokio::{
40        task::JoinHandle,
41        time::{sleep, timeout, Duration, Instant},
42    },
43};
44
45#[derive(Error, Debug)]
46pub enum TpuSenderError {
47    #[error("Pubsub error: {0:?}")]
48    PubsubError(#[from] PubsubClientError),
49    #[error("RPC error: {0:?}")]
50    RpcError(#[from] ClientError),
51    #[error("IO error: {0:?}")]
52    IoError(#[from] std::io::Error),
53    #[error("Signer error: {0:?}")]
54    SignerError(#[from] SignerError),
55    #[error("Custom error: {0}")]
56    Custom(String),
57}
58
59type Result<T> = std::result::Result<T, TpuSenderError>;
60
61/// Client which sends transactions directly to the current leader's TPU port over UDP.
62/// The client uses RPC to determine the current leader and fetch node contact info
63pub struct TpuClient {
64    fanout_slots: u64,
65    leader_tpu_service: LeaderTpuService,
66    exit: Arc<AtomicBool>,
67    rpc_client: Arc<RpcClient>,
68    connection_cache: Arc<ConnectionCache>,
69}
70
71async fn send_wire_transaction_to_addr(
72    connection_cache: &ConnectionCache,
73    addr: &SocketAddr,
74    wire_transaction: Vec<u8>,
75) -> TransportResult<()> {
76    let conn = connection_cache.get_nonblocking_connection(addr);
77    conn.send_wire_transaction(wire_transaction.clone()).await
78}
79
80async fn send_wire_transaction_batch_to_addr(
81    connection_cache: &ConnectionCache,
82    addr: &SocketAddr,
83    wire_transactions: &[Vec<u8>],
84) -> TransportResult<()> {
85    let conn = connection_cache.get_nonblocking_connection(addr);
86    conn.send_wire_transaction_batch(wire_transactions).await
87}
88
89impl TpuClient {
90    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
91    /// size
92    pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
93        let wire_transaction = serialize(transaction).expect("serialization should succeed");
94        self.send_wire_transaction(wire_transaction).await
95    }
96
97    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
98    pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
99        self.try_send_wire_transaction(wire_transaction)
100            .await
101            .is_ok()
102    }
103
104    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
105    /// size
106    /// Returns the last error if all sends fail
107    pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
108        let wire_transaction = serialize(transaction).expect("serialization should succeed");
109        self.try_send_wire_transaction(wire_transaction).await
110    }
111
112    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
113    /// Returns the last error if all sends fail
114    async fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
115        let leaders = self
116            .leader_tpu_service
117            .leader_tpu_sockets(self.fanout_slots);
118        let futures = leaders
119            .iter()
120            .map(|addr| {
121                send_wire_transaction_to_addr(
122                    &self.connection_cache,
123                    addr,
124                    wire_transaction.clone(),
125                )
126            })
127            .collect::<Vec<_>>();
128        let results: Vec<TransportResult<()>> = join_all(futures).await;
129
130        let mut last_error: Option<TransportError> = None;
131        let mut some_success = false;
132        for result in results {
133            if let Err(e) = result {
134                if last_error.is_none() {
135                    last_error = Some(e);
136                }
137            } else {
138                some_success = true;
139            }
140        }
141        if !some_success {
142            Err(if let Some(err) = last_error {
143                err
144            } else {
145                std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
146            })
147        } else {
148            Ok(())
149        }
150    }
151
152    /// Send a batch of wire transactions to the current and upcoming leader TPUs according to
153    /// fanout size
154    /// Returns the last error if all sends fail
155    pub async fn try_send_wire_transaction_batch(
156        &self,
157        wire_transactions: Vec<Vec<u8>>,
158    ) -> TransportResult<()> {
159        let leaders = self
160            .leader_tpu_service
161            .leader_tpu_sockets(self.fanout_slots);
162        let futures = leaders
163            .iter()
164            .map(|addr| {
165                send_wire_transaction_batch_to_addr(
166                    &self.connection_cache,
167                    addr,
168                    &wire_transactions,
169                )
170            })
171            .collect::<Vec<_>>();
172        let results: Vec<TransportResult<()>> = join_all(futures).await;
173
174        let mut last_error: Option<TransportError> = None;
175        let mut some_success = false;
176        for result in results {
177            if let Err(e) = result {
178                if last_error.is_none() {
179                    last_error = Some(e);
180                }
181            } else {
182                some_success = true;
183            }
184        }
185        if !some_success {
186            Err(if let Some(err) = last_error {
187                err
188            } else {
189                std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
190            })
191        } else {
192            Ok(())
193        }
194    }
195
196    /// Create a new client that disconnects when dropped
197    pub async fn new(
198        rpc_client: Arc<RpcClient>,
199        websocket_url: &str,
200        config: TpuClientConfig,
201    ) -> Result<Self> {
202        let connection_cache = Arc::new(ConnectionCache::default());
203        Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
204    }
205
206    /// Create a new client that disconnects when dropped
207    pub async fn new_with_connection_cache(
208        rpc_client: Arc<RpcClient>,
209        websocket_url: &str,
210        config: TpuClientConfig,
211        connection_cache: Arc<ConnectionCache>,
212    ) -> Result<Self> {
213        let exit = Arc::new(AtomicBool::new(false));
214        let leader_tpu_service =
215            LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?;
216
217        Ok(Self {
218            fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
219            leader_tpu_service,
220            exit,
221            rpc_client,
222            connection_cache,
223        })
224    }
225
226    pub async fn send_and_confirm_messages_with_spinner<T: Signers>(
227        &self,
228        messages: &[Message],
229        signers: &T,
230    ) -> Result<Vec<Option<TransactionError>>> {
231        let mut expired_blockhash_retries = 5;
232        let progress_bar = spinner::new_progress_bar();
233        progress_bar.set_message("Setting up...");
234
235        let mut transactions = messages
236            .iter()
237            .enumerate()
238            .map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
239            .collect::<Vec<_>>();
240        let total_transactions = transactions.len();
241        let mut transaction_errors = vec![None; transactions.len()];
242        let mut confirmed_transactions = 0;
243        let mut block_height = self.rpc_client.get_block_height().await?;
244        while expired_blockhash_retries > 0 {
245            let (blockhash, last_valid_block_height) = self
246                .rpc_client
247                .get_latest_blockhash_with_commitment(self.rpc_client.commitment())
248                .await?;
249
250            let mut pending_transactions = HashMap::new();
251            for (i, mut transaction) in transactions {
252                transaction.try_sign(signers, blockhash)?;
253                pending_transactions.insert(transaction.signatures[0], (i, transaction));
254            }
255
256            let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
257            while block_height <= last_valid_block_height {
258                let num_transactions = pending_transactions.len();
259
260                // Periodically re-send all pending transactions
261                if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
262                    for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
263                        if !self.send_transaction(transaction).await {
264                            let _result = self.rpc_client.send_transaction(transaction).await.ok();
265                        }
266                        spinner::set_message_for_confirmed_transactions(
267                            &progress_bar,
268                            confirmed_transactions,
269                            total_transactions,
270                            None, //block_height,
271                            last_valid_block_height,
272                            &format!("Sending {}/{} transactions", index + 1, num_transactions,),
273                        );
274                        sleep(SEND_TRANSACTION_INTERVAL).await;
275                    }
276                    last_resend = Instant::now();
277                }
278
279                // Wait for the next block before checking for transaction statuses
280                let mut block_height_refreshes = 10;
281                spinner::set_message_for_confirmed_transactions(
282                    &progress_bar,
283                    confirmed_transactions,
284                    total_transactions,
285                    Some(block_height),
286                    last_valid_block_height,
287                    &format!(
288                        "Waiting for next block, {} transactions pending...",
289                        num_transactions
290                    ),
291                );
292                let mut new_block_height = block_height;
293                while block_height == new_block_height && block_height_refreshes > 0 {
294                    sleep(Duration::from_millis(500)).await;
295                    new_block_height = self.rpc_client.get_block_height().await?;
296                    block_height_refreshes -= 1;
297                }
298                block_height = new_block_height;
299
300                // Collect statuses for the transactions, drop those that are confirmed
301                let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
302                for pending_signatures_chunk in
303                    pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
304                {
305                    if let Ok(result) = self
306                        .rpc_client
307                        .get_signature_statuses(pending_signatures_chunk)
308                        .await
309                    {
310                        let statuses = result.value;
311                        for (signature, status) in
312                            pending_signatures_chunk.iter().zip(statuses.into_iter())
313                        {
314                            if let Some(status) = status {
315                                if status.satisfies_commitment(self.rpc_client.commitment()) {
316                                    if let Some((i, _)) = pending_transactions.remove(signature) {
317                                        confirmed_transactions += 1;
318                                        if status.err.is_some() {
319                                            progress_bar.println(format!(
320                                                "Failed transaction: {:?}",
321                                                status
322                                            ));
323                                        }
324                                        transaction_errors[i] = status.err;
325                                    }
326                                }
327                            }
328                        }
329                    }
330                    spinner::set_message_for_confirmed_transactions(
331                        &progress_bar,
332                        confirmed_transactions,
333                        total_transactions,
334                        Some(block_height),
335                        last_valid_block_height,
336                        "Checking transaction status...",
337                    );
338                }
339
340                if pending_transactions.is_empty() {
341                    return Ok(transaction_errors);
342                }
343            }
344
345            transactions = pending_transactions.into_values().collect();
346            progress_bar.println(format!(
347                "Blockhash expired. {} retries remaining",
348                expired_blockhash_retries
349            ));
350            expired_blockhash_retries -= 1;
351        }
352        Err(TpuSenderError::Custom("Max retries exceeded".into()))
353    }
354
355    pub fn rpc_client(&self) -> &RpcClient {
356        &self.rpc_client
357    }
358
359    pub async fn shutdown(&mut self) {
360        self.exit.store(true, Ordering::Relaxed);
361        self.leader_tpu_service.join().await;
362    }
363}
364impl Drop for TpuClient {
365    fn drop(&mut self) {
366        self.exit.store(true, Ordering::Relaxed);
367    }
368}
369
370/// Service that tracks upcoming leaders and maintains an up-to-date mapping
371/// of leader id to TPU socket address.
372pub struct LeaderTpuService {
373    recent_slots: RecentLeaderSlots,
374    leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
375    t_leader_tpu_service: Option<JoinHandle<Result<()>>>,
376}
377
378impl LeaderTpuService {
379    pub async fn new(
380        rpc_client: Arc<RpcClient>,
381        websocket_url: &str,
382        exit: Arc<AtomicBool>,
383    ) -> Result<Self> {
384        let start_slot = rpc_client
385            .get_slot_with_commitment(CommitmentConfig::processed())
386            .await?;
387
388        let recent_slots = RecentLeaderSlots::new(start_slot);
389        let slots_in_epoch = rpc_client.get_epoch_info().await?.slots_in_epoch;
390        let leaders = rpc_client
391            .get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))
392            .await?;
393        let cluster_nodes = rpc_client.get_cluster_nodes().await?;
394        let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
395            start_slot,
396            slots_in_epoch,
397            leaders,
398            cluster_nodes,
399        )));
400
401        let pubsub_client = if !websocket_url.is_empty() {
402            Some(PubsubClient::new(websocket_url).await?)
403        } else {
404            None
405        };
406
407        let t_leader_tpu_service = Some({
408            let recent_slots = recent_slots.clone();
409            let leader_tpu_cache = leader_tpu_cache.clone();
410            tokio::spawn(async move {
411                Self::run(
412                    rpc_client,
413                    recent_slots,
414                    leader_tpu_cache,
415                    pubsub_client,
416                    exit,
417                )
418                .await
419            })
420        });
421
422        Ok(LeaderTpuService {
423            recent_slots,
424            leader_tpu_cache,
425            t_leader_tpu_service,
426        })
427    }
428
429    pub async fn join(&mut self) {
430        if let Some(t_handle) = self.t_leader_tpu_service.take() {
431            t_handle.await.unwrap().unwrap();
432        }
433    }
434
435    pub fn estimated_current_slot(&self) -> Slot {
436        self.recent_slots.estimated_current_slot()
437    }
438
439    fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
440        let current_slot = self.recent_slots.estimated_current_slot();
441        self.leader_tpu_cache
442            .read()
443            .unwrap()
444            .get_leader_sockets(current_slot, fanout_slots)
445    }
446
447    async fn run(
448        rpc_client: Arc<RpcClient>,
449        recent_slots: RecentLeaderSlots,
450        leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
451        pubsub_client: Option<PubsubClient>,
452        exit: Arc<AtomicBool>,
453    ) -> Result<()> {
454        let (mut notifications, unsubscribe) = if let Some(pubsub_client) = &pubsub_client {
455            let (notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?;
456            (Some(notifications), Some(unsubscribe))
457        } else {
458            (None, None)
459        };
460        let mut last_cluster_refresh = Instant::now();
461        let mut sleep_ms = 1000;
462        loop {
463            if exit.load(Ordering::Relaxed) {
464                if let Some(unsubscribe) = unsubscribe {
465                    (unsubscribe)().await;
466                }
467                // `notifications` requires a valid reference to `pubsub_client`
468                // so `notifications` must be dropped before moving `pubsub_client`
469                drop(notifications);
470                if let Some(pubsub_client) = pubsub_client {
471                    pubsub_client.shutdown().await.unwrap();
472                };
473                break;
474            }
475
476            // Sleep a slot before checking if leader cache needs to be refreshed again
477            sleep(Duration::from_millis(sleep_ms)).await;
478            sleep_ms = 1000;
479
480            if let Some(notifications) = &mut notifications {
481                while let Ok(Some(update)) =
482                    timeout(Duration::from_millis(10), notifications.next()).await
483                {
484                    let current_slot = match update {
485                        // This update indicates that a full slot was received by the connected
486                        // node so we can stop sending transactions to the leader for that slot
487                        SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
488                        // This update indicates that we have just received the first shred from
489                        // the leader for this slot and they are probably still accepting transactions.
490                        SlotUpdate::FirstShredReceived { slot, .. } => slot,
491                        _ => continue,
492                    };
493                    recent_slots.record_slot(current_slot);
494                }
495            }
496
497            let cache_update_info = maybe_fetch_cache_info(
498                &leader_tpu_cache,
499                last_cluster_refresh,
500                &rpc_client,
501                &recent_slots,
502            )
503            .await;
504
505            if cache_update_info.has_some() {
506                let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
507                let (has_error, cluster_refreshed) = leader_tpu_cache
508                    .update_all(recent_slots.estimated_current_slot(), cache_update_info);
509                if has_error {
510                    sleep_ms = 100;
511                }
512                if cluster_refreshed {
513                    last_cluster_refresh = Instant::now();
514                }
515            }
516        }
517        Ok(())
518    }
519}
520
521async fn maybe_fetch_cache_info(
522    leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
523    last_cluster_refresh: Instant,
524    rpc_client: &RpcClient,
525    recent_slots: &RecentLeaderSlots,
526) -> LeaderTpuCacheUpdateInfo {
527    // Refresh cluster TPU ports every 5min in case validators restart with new port configuration
528    // or new validators come online
529    let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
530        Some(rpc_client.get_cluster_nodes().await)
531    } else {
532        None
533    };
534
535    let estimated_current_slot = recent_slots.estimated_current_slot();
536    let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
537        let leader_tpu_cache = leader_tpu_cache.read().unwrap();
538        leader_tpu_cache.slot_info()
539    };
540    let maybe_epoch_info =
541        if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
542            Some(rpc_client.get_epoch_info().await)
543        } else {
544            None
545        };
546
547    let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
548    {
549        Some(
550            rpc_client
551                .get_slot_leaders(
552                    estimated_current_slot,
553                    LeaderTpuCache::fanout(slots_in_epoch),
554                )
555                .await,
556        )
557    } else {
558        None
559    };
560    LeaderTpuCacheUpdateInfo {
561        maybe_cluster_nodes,
562        maybe_epoch_info,
563        maybe_slot_leaders,
564    }
565}