1use {
7 crate::{
8 connection_cache::ConnectionCache, rpc_client::RpcClient,
9 rpc_config::RpcProgramAccountsConfig, rpc_response::Response,
10 tpu_connection::TpuConnection,
11 },
12 log::*,
13 solana_sdk::{
14 account::Account,
15 client::{AsyncClient, Client, SyncClient},
16 clock::{Slot, MAX_PROCESSING_AGE},
17 commitment_config::CommitmentConfig,
18 epoch_info::EpochInfo,
19 fee_calculator::{FeeCalculator, FeeRateGovernor},
20 hash::Hash,
21 instruction::Instruction,
22 message::Message,
23 pubkey::Pubkey,
24 signature::{Keypair, Signature, Signer},
25 signers::Signers,
26 system_instruction,
27 timing::duration_as_ms,
28 transaction::{self, Transaction, VersionedTransaction},
29 transport::Result as TransportResult,
30 },
31 std::{
32 io,
33 net::SocketAddr,
34 sync::{
35 atomic::{AtomicBool, AtomicUsize, Ordering},
36 Arc, RwLock,
37 },
38 time::{Duration, Instant},
39 },
40};
41
42struct ClientOptimizer {
43 cur_index: AtomicUsize,
44 experiment_index: AtomicUsize,
45 experiment_done: AtomicBool,
46 times: RwLock<Vec<u64>>,
47 num_clients: usize,
48}
49
50fn min_index(array: &[u64]) -> (u64, usize) {
51 let mut min_time = std::u64::MAX;
52 let mut min_index = 0;
53 for (i, time) in array.iter().enumerate() {
54 if *time < min_time {
55 min_time = *time;
56 min_index = i;
57 }
58 }
59 (min_time, min_index)
60}
61
62impl ClientOptimizer {
63 fn new(num_clients: usize) -> Self {
64 Self {
65 cur_index: AtomicUsize::new(0),
66 experiment_index: AtomicUsize::new(0),
67 experiment_done: AtomicBool::new(false),
68 times: RwLock::new(vec![std::u64::MAX; num_clients]),
69 num_clients,
70 }
71 }
72
73 fn experiment(&self) -> usize {
74 if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
75 let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
76 if old < self.num_clients {
77 old
78 } else {
79 self.best()
80 }
81 } else {
82 self.best()
83 }
84 }
85
86 fn report(&self, index: usize, time_ms: u64) {
87 if self.num_clients > 1
88 && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
89 {
90 trace!(
91 "report {} with {} exp: {}",
92 index,
93 time_ms,
94 self.experiment_index.load(Ordering::Relaxed)
95 );
96
97 self.times.write().unwrap()[index] = time_ms;
98
99 if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
100 let times = self.times.read().unwrap();
101 let (min_time, min_index) = min_index(×);
102 trace!(
103 "done experimenting min: {} time: {} times: {:?}",
104 min_index,
105 min_time,
106 times
107 );
108
109 self.cur_index.store(min_index, Ordering::Relaxed);
111 self.experiment_done.store(true, Ordering::Relaxed);
112 }
113 }
114 }
115
116 fn best(&self) -> usize {
117 self.cur_index.load(Ordering::Relaxed)
118 }
119}
120
121pub struct ThinClient {
123 rpc_clients: Vec<RpcClient>,
124 tpu_addrs: Vec<SocketAddr>,
125 optimizer: ClientOptimizer,
126 connection_cache: Arc<ConnectionCache>,
127}
128
129impl ThinClient {
130 pub fn new(
134 rpc_addr: SocketAddr,
135 tpu_addr: SocketAddr,
136 connection_cache: Arc<ConnectionCache>,
137 ) -> Self {
138 Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
139 }
140
141 pub fn new_socket_with_timeout(
142 rpc_addr: SocketAddr,
143 tpu_addr: SocketAddr,
144 timeout: Duration,
145 connection_cache: Arc<ConnectionCache>,
146 ) -> Self {
147 let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
148 Self::new_from_client(rpc_client, tpu_addr, connection_cache)
149 }
150
151 fn new_from_client(
152 rpc_client: RpcClient,
153 tpu_addr: SocketAddr,
154 connection_cache: Arc<ConnectionCache>,
155 ) -> Self {
156 Self {
157 rpc_clients: vec![rpc_client],
158 tpu_addrs: vec![tpu_addr],
159 optimizer: ClientOptimizer::new(0),
160 connection_cache,
161 }
162 }
163
164 pub fn new_from_addrs(
165 rpc_addrs: Vec<SocketAddr>,
166 tpu_addrs: Vec<SocketAddr>,
167 connection_cache: Arc<ConnectionCache>,
168 ) -> Self {
169 assert!(!rpc_addrs.is_empty());
170 assert_eq!(rpc_addrs.len(), tpu_addrs.len());
171
172 let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
173 let optimizer = ClientOptimizer::new(rpc_clients.len());
174 Self {
175 rpc_clients,
176 tpu_addrs,
177 optimizer,
178 connection_cache,
179 }
180 }
181
182 fn tpu_addr(&self) -> &SocketAddr {
183 &self.tpu_addrs[self.optimizer.best()]
184 }
185
186 pub fn rpc_client(&self) -> &RpcClient {
187 &self.rpc_clients[self.optimizer.best()]
188 }
189
190 pub fn retry_transfer_until_confirmed(
192 &self,
193 keypair: &Keypair,
194 transaction: &mut Transaction,
195 tries: usize,
196 min_confirmed_blocks: usize,
197 ) -> TransportResult<Signature> {
198 self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
199 }
200
201 pub fn retry_transfer(
203 &self,
204 keypair: &Keypair,
205 transaction: &mut Transaction,
206 tries: usize,
207 ) -> TransportResult<Signature> {
208 self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
209 }
210
211 pub fn send_and_confirm_transaction<T: Signers>(
212 &self,
213 keypairs: &T,
214 transaction: &mut Transaction,
215 tries: usize,
216 pending_confirmations: usize,
217 ) -> TransportResult<Signature> {
218 for x in 0..tries {
219 let now = Instant::now();
220 let mut num_confirmed = 0;
221 let mut wait_time = MAX_PROCESSING_AGE;
222 let wire_transaction =
224 bincode::serialize(&transaction).expect("transaction serialization failed");
225 while now.elapsed().as_secs() < wait_time as u64 {
226 if num_confirmed == 0 {
227 let conn = self.connection_cache.get_connection(self.tpu_addr());
228 conn.send_wire_transaction(&wire_transaction)?;
230 }
231
232 if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
233 &transaction.signatures[0],
234 pending_confirmations,
235 ) {
236 num_confirmed = confirmed_blocks;
237 if confirmed_blocks >= pending_confirmations {
238 return Ok(transaction.signatures[0]);
239 }
240 wait_time = wait_time.max(
244 MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
245 );
246 }
247 }
248 info!("{} tries failed transfer to {}", x, self.tpu_addr());
249 let blockhash = self.get_latest_blockhash()?;
250 transaction.sign(keypairs, blockhash);
251 }
252 Err(io::Error::new(
253 io::ErrorKind::Other,
254 format!("retry_transfer failed in {} retries", tries),
255 )
256 .into())
257 }
258
259 pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
260 self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
261 }
262
263 pub fn poll_get_balance_with_commitment(
264 &self,
265 pubkey: &Pubkey,
266 commitment_config: CommitmentConfig,
267 ) -> TransportResult<u64> {
268 self.rpc_client()
269 .poll_get_balance_with_commitment(pubkey, commitment_config)
270 .map_err(|e| e.into())
271 }
272
273 pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
274 self.rpc_client().wait_for_balance_with_commitment(
275 pubkey,
276 expected_balance,
277 CommitmentConfig::default(),
278 )
279 }
280
281 pub fn get_program_accounts_with_config(
282 &self,
283 pubkey: &Pubkey,
284 config: RpcProgramAccountsConfig,
285 ) -> TransportResult<Vec<(Pubkey, Account)>> {
286 self.rpc_client()
287 .get_program_accounts_with_config(pubkey, config)
288 .map_err(|e| e.into())
289 }
290
291 pub fn wait_for_balance_with_commitment(
292 &self,
293 pubkey: &Pubkey,
294 expected_balance: Option<u64>,
295 commitment_config: CommitmentConfig,
296 ) -> Option<u64> {
297 self.rpc_client().wait_for_balance_with_commitment(
298 pubkey,
299 expected_balance,
300 commitment_config,
301 )
302 }
303
304 pub fn poll_for_signature_with_commitment(
305 &self,
306 signature: &Signature,
307 commitment_config: CommitmentConfig,
308 ) -> TransportResult<()> {
309 self.rpc_client()
310 .poll_for_signature_with_commitment(signature, commitment_config)
311 .map_err(|e| e.into())
312 }
313
314 pub fn get_num_blocks_since_signature_confirmation(
315 &mut self,
316 sig: &Signature,
317 ) -> TransportResult<usize> {
318 self.rpc_client()
319 .get_num_blocks_since_signature_confirmation(sig)
320 .map_err(|e| e.into())
321 }
322}
323
324impl Client for ThinClient {
325 fn tpu_addr(&self) -> String {
326 self.tpu_addr().to_string()
327 }
328}
329
330impl SyncClient for ThinClient {
331 fn send_and_confirm_message<T: Signers>(
332 &self,
333 keypairs: &T,
334 message: Message,
335 ) -> TransportResult<Signature> {
336 let blockhash = self.get_latest_blockhash()?;
337 let mut transaction = Transaction::new(keypairs, message, blockhash);
338 let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
339 Ok(signature)
340 }
341
342 fn send_and_confirm_instruction(
343 &self,
344 keypair: &Keypair,
345 instruction: Instruction,
346 ) -> TransportResult<Signature> {
347 let message = Message::new(&[instruction], Some(&keypair.pubkey()));
348 self.send_and_confirm_message(&[keypair], message)
349 }
350
351 fn transfer_and_confirm(
352 &self,
353 lamports: u64,
354 keypair: &Keypair,
355 pubkey: &Pubkey,
356 ) -> TransportResult<Signature> {
357 let transfer_instruction =
358 system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
359 self.send_and_confirm_instruction(keypair, transfer_instruction)
360 }
361
362 fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
363 Ok(self.rpc_client().get_account_data(pubkey).ok())
364 }
365
366 fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
367 let account = self.rpc_client().get_account(pubkey);
368 match account {
369 Ok(value) => Ok(Some(value)),
370 Err(_) => Ok(None),
371 }
372 }
373
374 fn get_account_with_commitment(
375 &self,
376 pubkey: &Pubkey,
377 commitment_config: CommitmentConfig,
378 ) -> TransportResult<Option<Account>> {
379 self.rpc_client()
380 .get_account_with_commitment(pubkey, commitment_config)
381 .map_err(|e| e.into())
382 .map(|r| r.value)
383 }
384
385 fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
386 self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
387 }
388
389 fn get_balance_with_commitment(
390 &self,
391 pubkey: &Pubkey,
392 commitment_config: CommitmentConfig,
393 ) -> TransportResult<u64> {
394 self.rpc_client()
395 .get_balance_with_commitment(pubkey, commitment_config)
396 .map_err(|e| e.into())
397 .map(|r| r.value)
398 }
399
400 fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
401 self.rpc_client()
402 .get_minimum_balance_for_rent_exemption(data_len)
403 .map_err(|e| e.into())
404 }
405
406 fn get_recent_blockhash(&self) -> TransportResult<(Hash, FeeCalculator)> {
407 #[allow(deprecated)]
408 let (blockhash, fee_calculator, _last_valid_slot) =
409 self.get_recent_blockhash_with_commitment(CommitmentConfig::default())?;
410 Ok((blockhash, fee_calculator))
411 }
412
413 fn get_recent_blockhash_with_commitment(
414 &self,
415 commitment_config: CommitmentConfig,
416 ) -> TransportResult<(Hash, FeeCalculator, Slot)> {
417 let index = self.optimizer.experiment();
418 let now = Instant::now();
419 #[allow(deprecated)]
420 let recent_blockhash =
421 self.rpc_clients[index].get_recent_blockhash_with_commitment(commitment_config);
422 match recent_blockhash {
423 Ok(Response { value, .. }) => {
424 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
425 Ok((value.0, value.1, value.2))
426 }
427 Err(e) => {
428 self.optimizer.report(index, std::u64::MAX);
429 Err(e.into())
430 }
431 }
432 }
433
434 fn get_fee_calculator_for_blockhash(
435 &self,
436 blockhash: &Hash,
437 ) -> TransportResult<Option<FeeCalculator>> {
438 #[allow(deprecated)]
439 self.rpc_client()
440 .get_fee_calculator_for_blockhash(blockhash)
441 .map_err(|e| e.into())
442 }
443
444 fn get_fee_rate_governor(&self) -> TransportResult<FeeRateGovernor> {
445 #[allow(deprecated)]
446 self.rpc_client()
447 .get_fee_rate_governor()
448 .map_err(|e| e.into())
449 .map(|r| r.value)
450 }
451
452 fn get_signature_status(
453 &self,
454 signature: &Signature,
455 ) -> TransportResult<Option<transaction::Result<()>>> {
456 let status = self
457 .rpc_client()
458 .get_signature_status(signature)
459 .map_err(|err| {
460 io::Error::new(
461 io::ErrorKind::Other,
462 format!("send_transaction failed with error {:?}", err),
463 )
464 })?;
465 Ok(status)
466 }
467
468 fn get_signature_status_with_commitment(
469 &self,
470 signature: &Signature,
471 commitment_config: CommitmentConfig,
472 ) -> TransportResult<Option<transaction::Result<()>>> {
473 let status = self
474 .rpc_client()
475 .get_signature_status_with_commitment(signature, commitment_config)
476 .map_err(|err| {
477 io::Error::new(
478 io::ErrorKind::Other,
479 format!("send_transaction failed with error {:?}", err),
480 )
481 })?;
482 Ok(status)
483 }
484
485 fn get_slot(&self) -> TransportResult<u64> {
486 self.get_slot_with_commitment(CommitmentConfig::default())
487 }
488
489 fn get_slot_with_commitment(
490 &self,
491 commitment_config: CommitmentConfig,
492 ) -> TransportResult<u64> {
493 let slot = self
494 .rpc_client()
495 .get_slot_with_commitment(commitment_config)
496 .map_err(|err| {
497 io::Error::new(
498 io::ErrorKind::Other,
499 format!("send_transaction failed with error {:?}", err),
500 )
501 })?;
502 Ok(slot)
503 }
504
505 fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
506 self.rpc_client().get_epoch_info().map_err(|e| e.into())
507 }
508
509 fn get_transaction_count(&self) -> TransportResult<u64> {
510 let index = self.optimizer.experiment();
511 let now = Instant::now();
512 match self.rpc_client().get_transaction_count() {
513 Ok(transaction_count) => {
514 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
515 Ok(transaction_count)
516 }
517 Err(e) => {
518 self.optimizer.report(index, std::u64::MAX);
519 Err(e.into())
520 }
521 }
522 }
523
524 fn get_transaction_count_with_commitment(
525 &self,
526 commitment_config: CommitmentConfig,
527 ) -> TransportResult<u64> {
528 let index = self.optimizer.experiment();
529 let now = Instant::now();
530 match self
531 .rpc_client()
532 .get_transaction_count_with_commitment(commitment_config)
533 {
534 Ok(transaction_count) => {
535 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
536 Ok(transaction_count)
537 }
538 Err(e) => {
539 self.optimizer.report(index, std::u64::MAX);
540 Err(e.into())
541 }
542 }
543 }
544
545 fn poll_for_signature_confirmation(
547 &self,
548 signature: &Signature,
549 min_confirmed_blocks: usize,
550 ) -> TransportResult<usize> {
551 self.rpc_client()
552 .poll_for_signature_confirmation(signature, min_confirmed_blocks)
553 .map_err(|e| e.into())
554 }
555
556 fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
557 self.rpc_client()
558 .poll_for_signature(signature)
559 .map_err(|e| e.into())
560 }
561
562 fn get_new_blockhash(&self, blockhash: &Hash) -> TransportResult<(Hash, FeeCalculator)> {
563 #[allow(deprecated)]
564 self.rpc_client()
565 .get_new_blockhash(blockhash)
566 .map_err(|e| e.into())
567 }
568
569 fn get_latest_blockhash(&self) -> TransportResult<Hash> {
570 let (blockhash, _) =
571 self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
572 Ok(blockhash)
573 }
574
575 fn get_latest_blockhash_with_commitment(
576 &self,
577 commitment_config: CommitmentConfig,
578 ) -> TransportResult<(Hash, u64)> {
579 let index = self.optimizer.experiment();
580 let now = Instant::now();
581 match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
582 Ok((blockhash, last_valid_block_height)) => {
583 self.optimizer.report(index, duration_as_ms(&now.elapsed()));
584 Ok((blockhash, last_valid_block_height))
585 }
586 Err(e) => {
587 self.optimizer.report(index, std::u64::MAX);
588 Err(e.into())
589 }
590 }
591 }
592
593 fn is_blockhash_valid(
594 &self,
595 blockhash: &Hash,
596 commitment_config: CommitmentConfig,
597 ) -> TransportResult<bool> {
598 self.rpc_client()
599 .is_blockhash_valid(blockhash, commitment_config)
600 .map_err(|e| e.into())
601 }
602
603 fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
604 self.rpc_client()
605 .get_fee_for_message(message)
606 .map_err(|e| e.into())
607 }
608}
609
610impl AsyncClient for ThinClient {
611 fn async_send_versioned_transaction(
612 &self,
613 transaction: VersionedTransaction,
614 ) -> TransportResult<Signature> {
615 let conn = self.connection_cache.get_connection(self.tpu_addr());
616 conn.serialize_and_send_transaction(&transaction)?;
617 Ok(transaction.signatures[0])
618 }
619
620 fn async_send_versioned_transaction_batch(
621 &self,
622 batch: Vec<VersionedTransaction>,
623 ) -> TransportResult<()> {
624 let conn = self.connection_cache.get_connection(self.tpu_addr());
625 conn.par_serialize_and_send_transaction_batch(&batch[..])?;
626 Ok(())
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use {super::*, rayon::prelude::*};
633
634 #[test]
635 fn test_client_optimizer() {
636 solana_logger::setup();
637
638 const NUM_CLIENTS: usize = 5;
639 let optimizer = ClientOptimizer::new(NUM_CLIENTS);
640 (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
641 let index = optimizer.experiment();
642 optimizer.report(index, (NUM_CLIENTS - index) as u64);
643 });
644
645 let index = optimizer.experiment();
646 optimizer.report(index, 50);
647 assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
648
649 optimizer.report(optimizer.best(), std::u64::MAX);
650 assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
651 }
652}