1use {
2 crate::{
3 client_error::{ClientError, Result as ClientResult},
4 connection_cache::ConnectionCache,
5 pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
6 rpc_client::RpcClient,
7 rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
8 rpc_response::{RpcContactInfo, SlotUpdate},
9 spinner,
10 tpu_connection::TpuConnection,
11 },
12 bincode::serialize,
13 log::*,
14 rayon::iter::{IntoParallelIterator, ParallelIterator},
15 solana_sdk::{
16 clock::Slot,
17 commitment_config::CommitmentConfig,
18 epoch_info::EpochInfo,
19 message::Message,
20 pubkey::Pubkey,
21 signature::SignerError,
22 signers::Signers,
23 transaction::{Transaction, TransactionError},
24 transport::{Result as TransportResult, TransportError},
25 },
26 std::{
27 collections::{HashMap, HashSet, VecDeque},
28 net::{SocketAddr, UdpSocket},
29 str::FromStr,
30 sync::{
31 atomic::{AtomicBool, Ordering},
32 Arc, RwLock,
33 },
34 thread::{sleep, JoinHandle},
35 },
36 thiserror::Error,
37 tokio::time::{Duration, Instant},
38};
39
40#[derive(Error, Debug)]
41pub enum TpuSenderError {
42 #[error("Pubsub error: {0:?}")]
43 PubsubError(#[from] PubsubClientError),
44 #[error("RPC error: {0:?}")]
45 RpcError(#[from] ClientError),
46 #[error("IO error: {0:?}")]
47 IoError(#[from] std::io::Error),
48 #[error("Signer error: {0:?}")]
49 SignerError(#[from] SignerError),
50 #[error("Custom error: {0}")]
51 Custom(String),
52}
53
54type Result<T> = std::result::Result<T, TpuSenderError>;
55
56pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
58
59pub const MAX_FANOUT_SLOTS: u64 = 100;
61
62pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
64pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
66
67#[derive(Clone, Debug)]
69pub struct TpuClientConfig {
70 pub fanout_slots: u64,
73}
74
75impl Default for TpuClientConfig {
76 fn default() -> Self {
77 Self {
78 fanout_slots: DEFAULT_FANOUT_SLOTS,
79 }
80 }
81}
82
83pub struct TpuClient {
86 _deprecated: UdpSocket, fanout_slots: u64,
88 leader_tpu_service: LeaderTpuService,
89 exit: Arc<AtomicBool>,
90 rpc_client: Arc<RpcClient>,
91 connection_cache: Arc<ConnectionCache>,
92}
93
94impl TpuClient {
95 pub fn send_transaction(&self, transaction: &Transaction) -> bool {
98 let wire_transaction = serialize(transaction).expect("serialization should succeed");
99 self.send_wire_transaction(wire_transaction)
100 }
101
102 pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
104 self.try_send_wire_transaction(wire_transaction).is_ok()
105 }
106
107 pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
111 let wire_transaction = serialize(transaction).expect("serialization should succeed");
112 self.try_send_wire_transaction(wire_transaction)
113 }
114
115 pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
119 let wire_transactions = transactions
120 .into_par_iter()
121 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
122 .collect::<Vec<_>>();
123 self.try_send_wire_transaction_batch(wire_transactions)
124 }
125
126 fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
129 let mut last_error: Option<TransportError> = None;
130 let mut some_success = false;
131
132 for tpu_address in self
133 .leader_tpu_service
134 .leader_tpu_sockets(self.fanout_slots)
135 {
136 let conn = self.connection_cache.get_connection(&tpu_address);
137 let result = conn.send_wire_transaction_async(wire_transaction.clone());
138 if let Err(err) = result {
139 last_error = Some(err);
140 } else {
141 some_success = true;
142 }
143 }
144 if !some_success {
145 Err(if let Some(err) = last_error {
146 err
147 } else {
148 std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
149 })
150 } else {
151 Ok(())
152 }
153 }
154
155 fn try_send_wire_transaction_batch(
159 &self,
160 wire_transactions: Vec<Vec<u8>>,
161 ) -> TransportResult<()> {
162 let mut last_error: Option<TransportError> = None;
163 let mut some_success = false;
164
165 for tpu_address in self
166 .leader_tpu_service
167 .leader_tpu_sockets(self.fanout_slots)
168 {
169 let conn = self.connection_cache.get_connection(&tpu_address);
170 let result = conn.send_wire_transaction_batch_async(wire_transactions.clone());
171 if let Err(err) = result {
172 last_error = Some(err);
173 } else {
174 some_success = true;
175 }
176 }
177 if !some_success {
178 Err(if let Some(err) = last_error {
179 err
180 } else {
181 std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
182 })
183 } else {
184 Ok(())
185 }
186 }
187
188 pub fn new(
190 rpc_client: Arc<RpcClient>,
191 websocket_url: &str,
192 config: TpuClientConfig,
193 ) -> Result<Self> {
194 let connection_cache = Arc::new(ConnectionCache::default());
195 Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
196 }
197
198 pub fn new_with_connection_cache(
200 rpc_client: Arc<RpcClient>,
201 websocket_url: &str,
202 config: TpuClientConfig,
203 connection_cache: Arc<ConnectionCache>,
204 ) -> Result<Self> {
205 let exit = Arc::new(AtomicBool::new(false));
206 let leader_tpu_service =
207 LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?;
208
209 Ok(Self {
210 _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
211 fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
212 leader_tpu_service,
213 exit,
214 rpc_client,
215 connection_cache,
216 })
217 }
218
219 pub fn send_and_confirm_messages_with_spinner<T: Signers>(
220 &self,
221 messages: &[Message],
222 signers: &T,
223 ) -> Result<Vec<Option<TransactionError>>> {
224 let mut expired_blockhash_retries = 5;
225
226 let progress_bar = spinner::new_progress_bar();
227 progress_bar.set_message("Setting up...");
228
229 let mut transactions = messages
230 .iter()
231 .enumerate()
232 .map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
233 .collect::<Vec<_>>();
234 let total_transactions = transactions.len();
235 let mut transaction_errors = vec![None; transactions.len()];
236 let mut confirmed_transactions = 0;
237 let mut block_height = self.rpc_client.get_block_height()?;
238
239 while expired_blockhash_retries > 0 {
240 let (blockhash, last_valid_block_height) = self
241 .rpc_client
242 .get_latest_blockhash_with_commitment(self.rpc_client.commitment())?;
243
244 let mut pending_transactions = HashMap::new();
245 for (i, mut transaction) in transactions {
246 transaction.try_sign(signers, blockhash)?;
247 pending_transactions.insert(transaction.signatures[0], (i, transaction));
248 }
249
250 let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
251 while block_height <= last_valid_block_height {
252 let num_transactions = pending_transactions.len();
253
254 if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
256 for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
257 if !self.send_transaction(transaction) {
258 let _result = self.rpc_client.send_transaction(transaction).ok();
259 }
260 spinner::set_message_for_confirmed_transactions(
261 &progress_bar,
262 confirmed_transactions,
263 total_transactions,
264 None, last_valid_block_height,
266 &format!("Sending {}/{} transactions", index + 1, num_transactions,),
267 );
268 sleep(SEND_TRANSACTION_INTERVAL);
269 }
270 last_resend = Instant::now();
271 }
272
273 let mut block_height_refreshes = 10;
275 spinner::set_message_for_confirmed_transactions(
276 &progress_bar,
277 confirmed_transactions,
278 total_transactions,
279 Some(block_height),
280 last_valid_block_height,
281 &format!("Waiting for next block, {} pending...", num_transactions),
282 );
283 let mut new_block_height = block_height;
284 while block_height == new_block_height && block_height_refreshes > 0 {
285 sleep(Duration::from_millis(500));
286 new_block_height = self.rpc_client.get_block_height()?;
287 block_height_refreshes -= 1;
288 }
289 block_height = new_block_height;
290
291 let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
293 for pending_signatures_chunk in
294 pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
295 {
296 if let Ok(result) = self
297 .rpc_client
298 .get_signature_statuses(pending_signatures_chunk)
299 {
300 let statuses = result.value;
301 for (signature, status) in
302 pending_signatures_chunk.iter().zip(statuses.into_iter())
303 {
304 if let Some(status) = status {
305 if status.satisfies_commitment(self.rpc_client.commitment()) {
306 if let Some((i, _)) = pending_transactions.remove(signature) {
307 confirmed_transactions += 1;
308 if status.err.is_some() {
309 progress_bar.println(format!(
310 "Failed transaction: {:?}",
311 status
312 ));
313 }
314 transaction_errors[i] = status.err;
315 }
316 }
317 }
318 }
319 }
320 spinner::set_message_for_confirmed_transactions(
321 &progress_bar,
322 confirmed_transactions,
323 total_transactions,
324 Some(block_height),
325 last_valid_block_height,
326 "Checking transaction status...",
327 );
328 }
329
330 if pending_transactions.is_empty() {
331 return Ok(transaction_errors);
332 }
333 }
334
335 transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect();
336 progress_bar.println(format!(
337 "Blockhash expired. {} retries remaining",
338 expired_blockhash_retries
339 ));
340 expired_blockhash_retries -= 1;
341 }
342 Err(TpuSenderError::Custom("Max retries exceeded".into()))
343 }
344
345 pub fn rpc_client(&self) -> &RpcClient {
346 &self.rpc_client
347 }
348}
349
350impl Drop for TpuClient {
351 fn drop(&mut self) {
352 self.exit.store(true, Ordering::Relaxed);
353 self.leader_tpu_service.join();
354 }
355}
356
357pub(crate) struct LeaderTpuCacheUpdateInfo {
358 pub(crate) maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
359 pub(crate) maybe_epoch_info: Option<ClientResult<EpochInfo>>,
360 pub(crate) maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
361}
362impl LeaderTpuCacheUpdateInfo {
363 pub(crate) fn has_some(&self) -> bool {
364 self.maybe_cluster_nodes.is_some()
365 || self.maybe_epoch_info.is_some()
366 || self.maybe_slot_leaders.is_some()
367 }
368}
369
370pub(crate) struct LeaderTpuCache {
371 first_slot: Slot,
372 leaders: Vec<Pubkey>,
373 leader_tpu_map: HashMap<Pubkey, SocketAddr>,
374 slots_in_epoch: Slot,
375 last_epoch_info_slot: Slot,
376}
377
378impl LeaderTpuCache {
379 pub(crate) fn new(
380 first_slot: Slot,
381 slots_in_epoch: Slot,
382 leaders: Vec<Pubkey>,
383 cluster_nodes: Vec<RpcContactInfo>,
384 ) -> Self {
385 let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes);
386 Self {
387 first_slot,
388 leaders,
389 leader_tpu_map,
390 slots_in_epoch,
391 last_epoch_info_slot: first_slot,
392 }
393 }
394
395 pub(crate) fn last_slot(&self) -> Slot {
397 self.first_slot + self.leaders.len().saturating_sub(1) as u64
398 }
399
400 pub(crate) fn slot_info(&self) -> (Slot, Slot, Slot) {
401 (
402 self.last_slot(),
403 self.last_epoch_info_slot,
404 self.slots_in_epoch,
405 )
406 }
407
408 pub(crate) fn get_leader_sockets(
410 &self,
411 estimated_current_slot: Slot,
412 fanout_slots: u64,
413 ) -> Vec<SocketAddr> {
414 let mut leader_set = HashSet::new();
415 let mut leader_sockets = Vec::new();
416 let current_slot = std::cmp::max(estimated_current_slot, self.first_slot);
420 for leader_slot in current_slot..current_slot + fanout_slots {
421 if let Some(leader) = self.get_slot_leader(leader_slot) {
422 if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
423 if leader_set.insert(*leader) {
424 leader_sockets.push(*tpu_socket);
425 }
426 } else {
427 trace!("TPU not available for leader {}", leader);
429 }
430 } else {
431 warn!(
433 "Leader not known for slot {}; cache holds slots [{},{}]",
434 leader_slot,
435 self.first_slot,
436 self.last_slot()
437 );
438 }
439 }
440 leader_sockets
441 }
442
443 pub(crate) fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
444 if slot >= self.first_slot {
445 let index = slot - self.first_slot;
446 self.leaders.get(index as usize)
447 } else {
448 None
449 }
450 }
451
452 pub(crate) fn extract_cluster_tpu_sockets(
453 cluster_contact_info: Vec<RpcContactInfo>,
454 ) -> HashMap<Pubkey, SocketAddr> {
455 cluster_contact_info
456 .into_iter()
457 .filter_map(|contact_info| {
458 Some((
459 Pubkey::from_str(&contact_info.pubkey).ok()?,
460 contact_info.tpu?,
461 ))
462 })
463 .collect()
464 }
465
466 pub(crate) fn fanout(slots_in_epoch: Slot) -> Slot {
467 (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
468 }
469
470 pub(crate) fn update_all(
471 &mut self,
472 estimated_current_slot: Slot,
473 cache_update_info: LeaderTpuCacheUpdateInfo,
474 ) -> (bool, bool) {
475 let mut has_error = false;
476 let mut cluster_refreshed = false;
477 if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
478 match cluster_nodes {
479 Ok(cluster_nodes) => {
480 let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes);
481 self.leader_tpu_map = leader_tpu_map;
482 cluster_refreshed = true;
483 }
484 Err(err) => {
485 warn!("Failed to fetch cluster tpu sockets: {}", err);
486 has_error = true;
487 }
488 }
489 }
490
491 if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info {
492 self.slots_in_epoch = epoch_info.slots_in_epoch;
493 self.last_epoch_info_slot = estimated_current_slot;
494 }
495
496 if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
497 match slot_leaders {
498 Ok(slot_leaders) => {
499 self.first_slot = estimated_current_slot;
500 self.leaders = slot_leaders;
501 }
502 Err(err) => {
503 warn!(
504 "Failed to fetch slot leaders (current estimated slot: {}): {}",
505 estimated_current_slot, err
506 );
507 has_error = true;
508 }
509 }
510 }
511 (has_error, cluster_refreshed)
512 }
513}
514
515fn maybe_fetch_cache_info(
516 leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
517 last_cluster_refresh: Instant,
518 rpc_client: &RpcClient,
519 recent_slots: &RecentLeaderSlots,
520) -> LeaderTpuCacheUpdateInfo {
521 let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
524 Some(rpc_client.get_cluster_nodes())
525 } else {
526 None
527 };
528
529 let estimated_current_slot = recent_slots.estimated_current_slot();
530 let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
531 let leader_tpu_cache = leader_tpu_cache.read().unwrap();
532 leader_tpu_cache.slot_info()
533 };
534 let maybe_epoch_info =
535 if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
536 Some(rpc_client.get_epoch_info())
537 } else {
538 None
539 };
540
541 let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
542 {
543 Some(rpc_client.get_slot_leaders(
544 estimated_current_slot,
545 LeaderTpuCache::fanout(slots_in_epoch),
546 ))
547 } else {
548 None
549 };
550 LeaderTpuCacheUpdateInfo {
551 maybe_cluster_nodes,
552 maybe_epoch_info,
553 maybe_slot_leaders,
554 }
555}
556
557const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
559
560#[derive(Clone, Debug)]
561pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
562impl RecentLeaderSlots {
563 pub(crate) fn new(current_slot: Slot) -> Self {
564 let mut recent_slots = VecDeque::new();
565 recent_slots.push_back(current_slot);
566 Self(Arc::new(RwLock::new(recent_slots)))
567 }
568
569 pub(crate) fn record_slot(&self, current_slot: Slot) {
570 let mut recent_slots = self.0.write().unwrap();
571 recent_slots.push_back(current_slot);
572 while recent_slots.len() > 12 {
575 recent_slots.pop_front();
576 }
577 }
578
579 pub(crate) fn estimated_current_slot(&self) -> Slot {
581 let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
582 assert!(!recent_slots.is_empty());
583 recent_slots.sort_unstable();
584
585 let max_index = recent_slots.len() - 1;
588 let median_index = max_index / 2;
589 let median_recent_slot = recent_slots[median_index];
590 let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
591 let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
592
593 recent_slots
596 .into_iter()
597 .rev()
598 .find(|slot| *slot <= max_reasonable_current_slot)
599 .unwrap()
600 }
601}
602
603#[cfg(test)]
604impl From<Vec<Slot>> for RecentLeaderSlots {
605 fn from(recent_slots: Vec<Slot>) -> Self {
606 assert!(!recent_slots.is_empty());
607 Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
608 }
609}
610
611struct LeaderTpuService {
614 recent_slots: RecentLeaderSlots,
615 leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
616 subscription: Option<PubsubClientSubscription<SlotUpdate>>,
617 t_leader_tpu_service: Option<JoinHandle<()>>,
618}
619
620impl LeaderTpuService {
621 fn new(rpc_client: Arc<RpcClient>, websocket_url: &str, exit: Arc<AtomicBool>) -> Result<Self> {
622 let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
623
624 let recent_slots = RecentLeaderSlots::new(start_slot);
625 let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
626 let leaders =
627 rpc_client.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))?;
628 let cluster_nodes = rpc_client.get_cluster_nodes()?;
629 let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
630 start_slot,
631 slots_in_epoch,
632 leaders,
633 cluster_nodes,
634 )));
635
636 let subscription = if !websocket_url.is_empty() {
637 let recent_slots = recent_slots.clone();
638 Some(PubsubClient::slot_updates_subscribe(
639 websocket_url,
640 move |update| {
641 let current_slot = match update {
642 SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
645 SlotUpdate::FirstShredReceived { slot, .. } => slot,
648 _ => return,
649 };
650 recent_slots.record_slot(current_slot);
651 },
652 )?)
653 } else {
654 None
655 };
656
657 let t_leader_tpu_service = Some({
658 let recent_slots = recent_slots.clone();
659 let leader_tpu_cache = leader_tpu_cache.clone();
660 std::thread::Builder::new()
661 .name("ldr-tpu-srv".to_string())
662 .spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit))
663 .unwrap()
664 });
665
666 Ok(LeaderTpuService {
667 recent_slots,
668 leader_tpu_cache,
669 subscription,
670 t_leader_tpu_service,
671 })
672 }
673
674 fn join(&mut self) {
675 if let Some(mut subscription) = self.subscription.take() {
676 let _ = subscription.send_unsubscribe();
677 let _ = subscription.shutdown();
678 }
679 if let Some(t_handle) = self.t_leader_tpu_service.take() {
680 t_handle.join().unwrap();
681 }
682 }
683
684 fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
685 let current_slot = self.recent_slots.estimated_current_slot();
686 self.leader_tpu_cache
687 .read()
688 .unwrap()
689 .get_leader_sockets(current_slot, fanout_slots)
690 }
691
692 fn run(
693 rpc_client: Arc<RpcClient>,
694 recent_slots: RecentLeaderSlots,
695 leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
696 exit: Arc<AtomicBool>,
697 ) {
698 let mut last_cluster_refresh = Instant::now();
699 let mut sleep_ms = 1000;
700 loop {
701 if exit.load(Ordering::Relaxed) {
702 break;
703 }
704
705 sleep(Duration::from_millis(sleep_ms));
707 sleep_ms = 1000;
708
709 let cache_update_info = maybe_fetch_cache_info(
710 &leader_tpu_cache,
711 last_cluster_refresh,
712 &rpc_client,
713 &recent_slots,
714 );
715
716 if cache_update_info.has_some() {
717 let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
718 let (has_error, cluster_refreshed) = leader_tpu_cache
719 .update_all(recent_slots.estimated_current_slot(), cache_update_info);
720 if has_error {
721 sleep_ms = 100;
722 }
723 if cluster_refreshed {
724 last_cluster_refresh = Instant::now();
725 }
726 }
727 }
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734
735 fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
736 assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
737 }
738
739 #[test]
740 fn test_recent_leader_slots() {
741 assert_slot(RecentLeaderSlots::new(0), 0);
742
743 let mut recent_slots: Vec<Slot> = (1..=12).collect();
744 assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
745
746 recent_slots.reverse();
747 assert_slot(RecentLeaderSlots::from(recent_slots), 12);
748
749 assert_slot(
750 RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
751 1 + MAX_SLOT_SKIP_DISTANCE,
752 );
753 assert_slot(
754 RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
755 0,
756 );
757
758 assert_slot(RecentLeaderSlots::from(vec![1]), 1);
759 assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
760 assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
761 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
762 assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
763 }
764}