safecoin_client/
tpu_client.rs

1use {
2    crate::{
3        client_error::{ClientError, Result as ClientResult},
4        connection_cache::ConnectionCache,
5        pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
6        rpc_client::RpcClient,
7        rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
8        rpc_response::{RpcContactInfo, SlotUpdate},
9        spinner,
10        tpu_connection::TpuConnection,
11    },
12    bincode::serialize,
13    log::*,
14    rayon::iter::{IntoParallelIterator, ParallelIterator},
15    solana_sdk::{
16        clock::Slot,
17        commitment_config::CommitmentConfig,
18        epoch_info::EpochInfo,
19        message::Message,
20        pubkey::Pubkey,
21        signature::SignerError,
22        signers::Signers,
23        transaction::{Transaction, TransactionError},
24        transport::{Result as TransportResult, TransportError},
25    },
26    std::{
27        collections::{HashMap, HashSet, VecDeque},
28        net::{SocketAddr, UdpSocket},
29        str::FromStr,
30        sync::{
31            atomic::{AtomicBool, Ordering},
32            Arc, RwLock,
33        },
34        thread::{sleep, JoinHandle},
35    },
36    thiserror::Error,
37    tokio::time::{Duration, Instant},
38};
39
40#[derive(Error, Debug)]
41pub enum TpuSenderError {
42    #[error("Pubsub error: {0:?}")]
43    PubsubError(#[from] PubsubClientError),
44    #[error("RPC error: {0:?}")]
45    RpcError(#[from] ClientError),
46    #[error("IO error: {0:?}")]
47    IoError(#[from] std::io::Error),
48    #[error("Signer error: {0:?}")]
49    SignerError(#[from] SignerError),
50    #[error("Custom error: {0}")]
51    Custom(String),
52}
53
54type Result<T> = std::result::Result<T, TpuSenderError>;
55
56/// Default number of slots used to build TPU socket fanout set
57pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
58
59/// Maximum number of slots used to build TPU socket fanout set
60pub const MAX_FANOUT_SLOTS: u64 = 100;
61
62/// Send at ~100 TPS
63pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
64/// Retry batch send after 4 seconds
65pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
66
67/// Config params for `TpuClient`
68#[derive(Clone, Debug)]
69pub struct TpuClientConfig {
70    /// The range of upcoming slots to include when determining which
71    /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
72    pub fanout_slots: u64,
73}
74
75impl Default for TpuClientConfig {
76    fn default() -> Self {
77        Self {
78            fanout_slots: DEFAULT_FANOUT_SLOTS,
79        }
80    }
81}
82
83/// Client which sends transactions directly to the current leader's TPU port over UDP.
84/// The client uses RPC to determine the current leader and fetch node contact info
85pub struct TpuClient {
86    _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
87    fanout_slots: u64,
88    leader_tpu_service: LeaderTpuService,
89    exit: Arc<AtomicBool>,
90    rpc_client: Arc<RpcClient>,
91    connection_cache: Arc<ConnectionCache>,
92}
93
94impl TpuClient {
95    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
96    /// size
97    pub fn send_transaction(&self, transaction: &Transaction) -> bool {
98        let wire_transaction = serialize(transaction).expect("serialization should succeed");
99        self.send_wire_transaction(wire_transaction)
100    }
101
102    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
103    pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
104        self.try_send_wire_transaction(wire_transaction).is_ok()
105    }
106
107    /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
108    /// size
109    /// Returns the last error if all sends fail
110    pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
111        let wire_transaction = serialize(transaction).expect("serialization should succeed");
112        self.try_send_wire_transaction(wire_transaction)
113    }
114
115    /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
116    /// to fanout size
117    /// Returns the last error if all sends fail
118    pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
119        let wire_transactions = transactions
120            .into_par_iter()
121            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
122            .collect::<Vec<_>>();
123        self.try_send_wire_transaction_batch(wire_transactions)
124    }
125
126    /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
127    /// Returns the last error if all sends fail
128    fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
129        let mut last_error: Option<TransportError> = None;
130        let mut some_success = false;
131
132        for tpu_address in self
133            .leader_tpu_service
134            .leader_tpu_sockets(self.fanout_slots)
135        {
136            let conn = self.connection_cache.get_connection(&tpu_address);
137            let result = conn.send_wire_transaction_async(wire_transaction.clone());
138            if let Err(err) = result {
139                last_error = Some(err);
140            } else {
141                some_success = true;
142            }
143        }
144        if !some_success {
145            Err(if let Some(err) = last_error {
146                err
147            } else {
148                std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
149            })
150        } else {
151            Ok(())
152        }
153    }
154
155    /// Send a batch of wire transactions to the current and upcoming leader TPUs according to
156    /// fanout size
157    /// Returns the last error if all sends fail
158    fn try_send_wire_transaction_batch(
159        &self,
160        wire_transactions: Vec<Vec<u8>>,
161    ) -> TransportResult<()> {
162        let mut last_error: Option<TransportError> = None;
163        let mut some_success = false;
164
165        for tpu_address in self
166            .leader_tpu_service
167            .leader_tpu_sockets(self.fanout_slots)
168        {
169            let conn = self.connection_cache.get_connection(&tpu_address);
170            let result = conn.send_wire_transaction_batch_async(wire_transactions.clone());
171            if let Err(err) = result {
172                last_error = Some(err);
173            } else {
174                some_success = true;
175            }
176        }
177        if !some_success {
178            Err(if let Some(err) = last_error {
179                err
180            } else {
181                std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
182            })
183        } else {
184            Ok(())
185        }
186    }
187
188    /// Create a new client that disconnects when dropped
189    pub fn new(
190        rpc_client: Arc<RpcClient>,
191        websocket_url: &str,
192        config: TpuClientConfig,
193    ) -> Result<Self> {
194        let connection_cache = Arc::new(ConnectionCache::default());
195        Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
196    }
197
198    /// Create a new client that disconnects when dropped
199    pub fn new_with_connection_cache(
200        rpc_client: Arc<RpcClient>,
201        websocket_url: &str,
202        config: TpuClientConfig,
203        connection_cache: Arc<ConnectionCache>,
204    ) -> Result<Self> {
205        let exit = Arc::new(AtomicBool::new(false));
206        let leader_tpu_service =
207            LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?;
208
209        Ok(Self {
210            _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
211            fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
212            leader_tpu_service,
213            exit,
214            rpc_client,
215            connection_cache,
216        })
217    }
218
219    pub fn send_and_confirm_messages_with_spinner<T: Signers>(
220        &self,
221        messages: &[Message],
222        signers: &T,
223    ) -> Result<Vec<Option<TransactionError>>> {
224        let mut expired_blockhash_retries = 5;
225
226        let progress_bar = spinner::new_progress_bar();
227        progress_bar.set_message("Setting up...");
228
229        let mut transactions = messages
230            .iter()
231            .enumerate()
232            .map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
233            .collect::<Vec<_>>();
234        let total_transactions = transactions.len();
235        let mut transaction_errors = vec![None; transactions.len()];
236        let mut confirmed_transactions = 0;
237        let mut block_height = self.rpc_client.get_block_height()?;
238
239        while expired_blockhash_retries > 0 {
240            let (blockhash, last_valid_block_height) = self
241                .rpc_client
242                .get_latest_blockhash_with_commitment(self.rpc_client.commitment())?;
243
244            let mut pending_transactions = HashMap::new();
245            for (i, mut transaction) in transactions {
246                transaction.try_sign(signers, blockhash)?;
247                pending_transactions.insert(transaction.signatures[0], (i, transaction));
248            }
249
250            let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
251            while block_height <= last_valid_block_height {
252                let num_transactions = pending_transactions.len();
253
254                // Periodically re-send all pending transactions
255                if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
256                    for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
257                        if !self.send_transaction(transaction) {
258                            let _result = self.rpc_client.send_transaction(transaction).ok();
259                        }
260                        spinner::set_message_for_confirmed_transactions(
261                            &progress_bar,
262                            confirmed_transactions,
263                            total_transactions,
264                            None, //block_height,
265                            last_valid_block_height,
266                            &format!("Sending {}/{} transactions", index + 1, num_transactions,),
267                        );
268                        sleep(SEND_TRANSACTION_INTERVAL);
269                    }
270                    last_resend = Instant::now();
271                }
272
273                // Wait for the next block before checking for transaction statuses
274                let mut block_height_refreshes = 10;
275                spinner::set_message_for_confirmed_transactions(
276                    &progress_bar,
277                    confirmed_transactions,
278                    total_transactions,
279                    Some(block_height),
280                    last_valid_block_height,
281                    &format!("Waiting for next block, {} pending...", num_transactions),
282                );
283                let mut new_block_height = block_height;
284                while block_height == new_block_height && block_height_refreshes > 0 {
285                    sleep(Duration::from_millis(500));
286                    new_block_height = self.rpc_client.get_block_height()?;
287                    block_height_refreshes -= 1;
288                }
289                block_height = new_block_height;
290
291                // Collect statuses for the transactions, drop those that are confirmed
292                let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
293                for pending_signatures_chunk in
294                    pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
295                {
296                    if let Ok(result) = self
297                        .rpc_client
298                        .get_signature_statuses(pending_signatures_chunk)
299                    {
300                        let statuses = result.value;
301                        for (signature, status) in
302                            pending_signatures_chunk.iter().zip(statuses.into_iter())
303                        {
304                            if let Some(status) = status {
305                                if status.satisfies_commitment(self.rpc_client.commitment()) {
306                                    if let Some((i, _)) = pending_transactions.remove(signature) {
307                                        confirmed_transactions += 1;
308                                        if status.err.is_some() {
309                                            progress_bar.println(format!(
310                                                "Failed transaction: {:?}",
311                                                status
312                                            ));
313                                        }
314                                        transaction_errors[i] = status.err;
315                                    }
316                                }
317                            }
318                        }
319                    }
320                    spinner::set_message_for_confirmed_transactions(
321                        &progress_bar,
322                        confirmed_transactions,
323                        total_transactions,
324                        Some(block_height),
325                        last_valid_block_height,
326                        "Checking transaction status...",
327                    );
328                }
329
330                if pending_transactions.is_empty() {
331                    return Ok(transaction_errors);
332                }
333            }
334
335            transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect();
336            progress_bar.println(format!(
337                "Blockhash expired. {} retries remaining",
338                expired_blockhash_retries
339            ));
340            expired_blockhash_retries -= 1;
341        }
342        Err(TpuSenderError::Custom("Max retries exceeded".into()))
343    }
344
345    pub fn rpc_client(&self) -> &RpcClient {
346        &self.rpc_client
347    }
348}
349
350impl Drop for TpuClient {
351    fn drop(&mut self) {
352        self.exit.store(true, Ordering::Relaxed);
353        self.leader_tpu_service.join();
354    }
355}
356
357pub(crate) struct LeaderTpuCacheUpdateInfo {
358    pub(crate) maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
359    pub(crate) maybe_epoch_info: Option<ClientResult<EpochInfo>>,
360    pub(crate) maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
361}
362impl LeaderTpuCacheUpdateInfo {
363    pub(crate) fn has_some(&self) -> bool {
364        self.maybe_cluster_nodes.is_some()
365            || self.maybe_epoch_info.is_some()
366            || self.maybe_slot_leaders.is_some()
367    }
368}
369
370pub(crate) struct LeaderTpuCache {
371    first_slot: Slot,
372    leaders: Vec<Pubkey>,
373    leader_tpu_map: HashMap<Pubkey, SocketAddr>,
374    slots_in_epoch: Slot,
375    last_epoch_info_slot: Slot,
376}
377
378impl LeaderTpuCache {
379    pub(crate) fn new(
380        first_slot: Slot,
381        slots_in_epoch: Slot,
382        leaders: Vec<Pubkey>,
383        cluster_nodes: Vec<RpcContactInfo>,
384    ) -> Self {
385        let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes);
386        Self {
387            first_slot,
388            leaders,
389            leader_tpu_map,
390            slots_in_epoch,
391            last_epoch_info_slot: first_slot,
392        }
393    }
394
395    // Last slot that has a cached leader pubkey
396    pub(crate) fn last_slot(&self) -> Slot {
397        self.first_slot + self.leaders.len().saturating_sub(1) as u64
398    }
399
400    pub(crate) fn slot_info(&self) -> (Slot, Slot, Slot) {
401        (
402            self.last_slot(),
403            self.last_epoch_info_slot,
404            self.slots_in_epoch,
405        )
406    }
407
408    // Get the TPU sockets for the current leader and upcoming leaders according to fanout size
409    pub(crate) fn get_leader_sockets(
410        &self,
411        estimated_current_slot: Slot,
412        fanout_slots: u64,
413    ) -> Vec<SocketAddr> {
414        let mut leader_set = HashSet::new();
415        let mut leader_sockets = Vec::new();
416        // `first_slot` might have been advanced since caller last read the `estimated_current_slot`
417        // value. Take the greater of the two values to ensure we are reading from the latest
418        // leader schedule.
419        let current_slot = std::cmp::max(estimated_current_slot, self.first_slot);
420        for leader_slot in current_slot..current_slot + fanout_slots {
421            if let Some(leader) = self.get_slot_leader(leader_slot) {
422                if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
423                    if leader_set.insert(*leader) {
424                        leader_sockets.push(*tpu_socket);
425                    }
426                } else {
427                    // The leader is probably delinquent
428                    trace!("TPU not available for leader {}", leader);
429                }
430            } else {
431                // Overran the local leader schedule cache
432                warn!(
433                    "Leader not known for slot {}; cache holds slots [{},{}]",
434                    leader_slot,
435                    self.first_slot,
436                    self.last_slot()
437                );
438            }
439        }
440        leader_sockets
441    }
442
443    pub(crate) fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
444        if slot >= self.first_slot {
445            let index = slot - self.first_slot;
446            self.leaders.get(index as usize)
447        } else {
448            None
449        }
450    }
451
452    pub(crate) fn extract_cluster_tpu_sockets(
453        cluster_contact_info: Vec<RpcContactInfo>,
454    ) -> HashMap<Pubkey, SocketAddr> {
455        cluster_contact_info
456            .into_iter()
457            .filter_map(|contact_info| {
458                Some((
459                    Pubkey::from_str(&contact_info.pubkey).ok()?,
460                    contact_info.tpu?,
461                ))
462            })
463            .collect()
464    }
465
466    pub(crate) fn fanout(slots_in_epoch: Slot) -> Slot {
467        (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
468    }
469
470    pub(crate) fn update_all(
471        &mut self,
472        estimated_current_slot: Slot,
473        cache_update_info: LeaderTpuCacheUpdateInfo,
474    ) -> (bool, bool) {
475        let mut has_error = false;
476        let mut cluster_refreshed = false;
477        if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
478            match cluster_nodes {
479                Ok(cluster_nodes) => {
480                    let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes);
481                    self.leader_tpu_map = leader_tpu_map;
482                    cluster_refreshed = true;
483                }
484                Err(err) => {
485                    warn!("Failed to fetch cluster tpu sockets: {}", err);
486                    has_error = true;
487                }
488            }
489        }
490
491        if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info {
492            self.slots_in_epoch = epoch_info.slots_in_epoch;
493            self.last_epoch_info_slot = estimated_current_slot;
494        }
495
496        if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
497            match slot_leaders {
498                Ok(slot_leaders) => {
499                    self.first_slot = estimated_current_slot;
500                    self.leaders = slot_leaders;
501                }
502                Err(err) => {
503                    warn!(
504                        "Failed to fetch slot leaders (current estimated slot: {}): {}",
505                        estimated_current_slot, err
506                    );
507                    has_error = true;
508                }
509            }
510        }
511        (has_error, cluster_refreshed)
512    }
513}
514
515fn maybe_fetch_cache_info(
516    leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
517    last_cluster_refresh: Instant,
518    rpc_client: &RpcClient,
519    recent_slots: &RecentLeaderSlots,
520) -> LeaderTpuCacheUpdateInfo {
521    // Refresh cluster TPU ports every 5min in case validators restart with new port configuration
522    // or new validators come online
523    let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
524        Some(rpc_client.get_cluster_nodes())
525    } else {
526        None
527    };
528
529    let estimated_current_slot = recent_slots.estimated_current_slot();
530    let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
531        let leader_tpu_cache = leader_tpu_cache.read().unwrap();
532        leader_tpu_cache.slot_info()
533    };
534    let maybe_epoch_info =
535        if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
536            Some(rpc_client.get_epoch_info())
537        } else {
538            None
539        };
540
541    let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
542    {
543        Some(rpc_client.get_slot_leaders(
544            estimated_current_slot,
545            LeaderTpuCache::fanout(slots_in_epoch),
546        ))
547    } else {
548        None
549    };
550    LeaderTpuCacheUpdateInfo {
551        maybe_cluster_nodes,
552        maybe_epoch_info,
553        maybe_slot_leaders,
554    }
555}
556
557// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
558const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
559
560#[derive(Clone, Debug)]
561pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
562impl RecentLeaderSlots {
563    pub(crate) fn new(current_slot: Slot) -> Self {
564        let mut recent_slots = VecDeque::new();
565        recent_slots.push_back(current_slot);
566        Self(Arc::new(RwLock::new(recent_slots)))
567    }
568
569    pub(crate) fn record_slot(&self, current_slot: Slot) {
570        let mut recent_slots = self.0.write().unwrap();
571        recent_slots.push_back(current_slot);
572        // 12 recent slots should be large enough to avoid a misbehaving
573        // validator from affecting the median recent slot
574        while recent_slots.len() > 12 {
575            recent_slots.pop_front();
576        }
577    }
578
579    // Estimate the current slot from recent slot notifications.
580    pub(crate) fn estimated_current_slot(&self) -> Slot {
581        let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
582        assert!(!recent_slots.is_empty());
583        recent_slots.sort_unstable();
584
585        // Validators can broadcast invalid blocks that are far in the future
586        // so check if the current slot is in line with the recent progression.
587        let max_index = recent_slots.len() - 1;
588        let median_index = max_index / 2;
589        let median_recent_slot = recent_slots[median_index];
590        let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
591        let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
592
593        // Return the highest slot that doesn't exceed what we believe is a
594        // reasonable slot.
595        recent_slots
596            .into_iter()
597            .rev()
598            .find(|slot| *slot <= max_reasonable_current_slot)
599            .unwrap()
600    }
601}
602
603#[cfg(test)]
604impl From<Vec<Slot>> for RecentLeaderSlots {
605    fn from(recent_slots: Vec<Slot>) -> Self {
606        assert!(!recent_slots.is_empty());
607        Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
608    }
609}
610
611/// Service that tracks upcoming leaders and maintains an up-to-date mapping
612/// of leader id to TPU socket address.
613struct LeaderTpuService {
614    recent_slots: RecentLeaderSlots,
615    leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
616    subscription: Option<PubsubClientSubscription<SlotUpdate>>,
617    t_leader_tpu_service: Option<JoinHandle<()>>,
618}
619
620impl LeaderTpuService {
621    fn new(rpc_client: Arc<RpcClient>, websocket_url: &str, exit: Arc<AtomicBool>) -> Result<Self> {
622        let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
623
624        let recent_slots = RecentLeaderSlots::new(start_slot);
625        let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
626        let leaders =
627            rpc_client.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))?;
628        let cluster_nodes = rpc_client.get_cluster_nodes()?;
629        let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
630            start_slot,
631            slots_in_epoch,
632            leaders,
633            cluster_nodes,
634        )));
635
636        let subscription = if !websocket_url.is_empty() {
637            let recent_slots = recent_slots.clone();
638            Some(PubsubClient::slot_updates_subscribe(
639                websocket_url,
640                move |update| {
641                    let current_slot = match update {
642                        // This update indicates that a full slot was received by the connected
643                        // node so we can stop sending transactions to the leader for that slot
644                        SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
645                        // This update indicates that we have just received the first shred from
646                        // the leader for this slot and they are probably still accepting transactions.
647                        SlotUpdate::FirstShredReceived { slot, .. } => slot,
648                        _ => return,
649                    };
650                    recent_slots.record_slot(current_slot);
651                },
652            )?)
653        } else {
654            None
655        };
656
657        let t_leader_tpu_service = Some({
658            let recent_slots = recent_slots.clone();
659            let leader_tpu_cache = leader_tpu_cache.clone();
660            std::thread::Builder::new()
661                .name("ldr-tpu-srv".to_string())
662                .spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit))
663                .unwrap()
664        });
665
666        Ok(LeaderTpuService {
667            recent_slots,
668            leader_tpu_cache,
669            subscription,
670            t_leader_tpu_service,
671        })
672    }
673
674    fn join(&mut self) {
675        if let Some(mut subscription) = self.subscription.take() {
676            let _ = subscription.send_unsubscribe();
677            let _ = subscription.shutdown();
678        }
679        if let Some(t_handle) = self.t_leader_tpu_service.take() {
680            t_handle.join().unwrap();
681        }
682    }
683
684    fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
685        let current_slot = self.recent_slots.estimated_current_slot();
686        self.leader_tpu_cache
687            .read()
688            .unwrap()
689            .get_leader_sockets(current_slot, fanout_slots)
690    }
691
692    fn run(
693        rpc_client: Arc<RpcClient>,
694        recent_slots: RecentLeaderSlots,
695        leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
696        exit: Arc<AtomicBool>,
697    ) {
698        let mut last_cluster_refresh = Instant::now();
699        let mut sleep_ms = 1000;
700        loop {
701            if exit.load(Ordering::Relaxed) {
702                break;
703            }
704
705            // Sleep a few slots before checking if leader cache needs to be refreshed again
706            sleep(Duration::from_millis(sleep_ms));
707            sleep_ms = 1000;
708
709            let cache_update_info = maybe_fetch_cache_info(
710                &leader_tpu_cache,
711                last_cluster_refresh,
712                &rpc_client,
713                &recent_slots,
714            );
715
716            if cache_update_info.has_some() {
717                let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
718                let (has_error, cluster_refreshed) = leader_tpu_cache
719                    .update_all(recent_slots.estimated_current_slot(), cache_update_info);
720                if has_error {
721                    sleep_ms = 100;
722                }
723                if cluster_refreshed {
724                    last_cluster_refresh = Instant::now();
725                }
726            }
727        }
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734
735    fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
736        assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
737    }
738
739    #[test]
740    fn test_recent_leader_slots() {
741        assert_slot(RecentLeaderSlots::new(0), 0);
742
743        let mut recent_slots: Vec<Slot> = (1..=12).collect();
744        assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
745
746        recent_slots.reverse();
747        assert_slot(RecentLeaderSlots::from(recent_slots), 12);
748
749        assert_slot(
750            RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
751            1 + MAX_SLOT_SKIP_DISTANCE,
752        );
753        assert_slot(
754            RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
755            0,
756        );
757
758        assert_slot(RecentLeaderSlots::from(vec![1]), 1);
759        assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
760        assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
761        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
762        assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
763    }
764}