1use {
7 log::*,
8 rayon::iter::{IntoParallelIterator, ParallelIterator},
9 solana_account::Account,
10 solana_client_traits::{AsyncClient, Client, SyncClient},
11 solana_clock::MAX_PROCESSING_AGE,
12 solana_commitment_config::CommitmentConfig,
13 solana_connection_cache::{
14 client_connection::ClientConnection,
15 connection_cache::{
16 ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
17 },
18 },
19 solana_epoch_info::EpochInfo,
20 solana_hash::Hash,
21 solana_instruction::Instruction,
22 solana_keypair::Keypair,
23 solana_message::Message,
24 solana_pubkey::Pubkey,
25 solana_rpc_client::rpc_client::RpcClient,
26 solana_rpc_client_api::config::RpcProgramAccountsConfig,
27 solana_signature::Signature,
28 solana_signer::{signers::Signers, Signer},
29 solana_system_interface::instruction::transfer,
30 solana_transaction::{versioned::VersionedTransaction, Transaction},
31 solana_transaction_error::{TransactionResult, TransportResult},
32 std::{
33 io,
34 net::SocketAddr,
35 sync::{
36 atomic::{AtomicBool, AtomicUsize, Ordering},
37 Arc, RwLock,
38 },
39 time::{Duration, Instant},
40 },
41};
42
43struct ClientOptimizer {
44 cur_index: AtomicUsize,
45 experiment_index: AtomicUsize,
46 experiment_done: AtomicBool,
47 times: RwLock<Vec<u64>>,
48 num_clients: usize,
49}
50
51impl ClientOptimizer {
52 fn new(num_clients: usize) -> Self {
53 Self {
54 cur_index: AtomicUsize::new(0),
55 experiment_index: AtomicUsize::new(0),
56 experiment_done: AtomicBool::new(false),
57 times: RwLock::new(vec![u64::MAX; num_clients]),
58 num_clients,
59 }
60 }
61
62 fn experiment(&self) -> usize {
63 if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
64 let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
65 if old < self.num_clients {
66 old
67 } else {
68 self.best()
69 }
70 } else {
71 self.best()
72 }
73 }
74
75 fn report(&self, index: usize, time_ms: u64) {
76 if self.num_clients > 1
77 && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == u64::MAX)
78 {
79 trace!(
80 "report {} with {} exp: {}",
81 index,
82 time_ms,
83 self.experiment_index.load(Ordering::Relaxed)
84 );
85
86 self.times.write().unwrap()[index] = time_ms;
87
88 if index == (self.num_clients - 1) || time_ms == u64::MAX {
89 let times = self.times.read().unwrap();
90 let (min_time, min_index) = min_index(×);
91 trace!(
92 "done experimenting min: {} time: {} times: {:?}",
93 min_index,
94 min_time,
95 times
96 );
97
98 self.cur_index.store(min_index, Ordering::Relaxed);
100 self.experiment_done.store(true, Ordering::Relaxed);
101 }
102 }
103 }
104
105 fn best(&self) -> usize {
106 self.cur_index.load(Ordering::Relaxed)
107 }
108}
109
110#[deprecated(since = "2.0.0", note = "Use [RpcClient] or [TpuClient] instead.")]
112pub struct ThinClient<
113 P, M, C, > {
117 rpc_clients: Vec<RpcClient>,
118 tpu_addrs: Vec<SocketAddr>,
119 optimizer: ClientOptimizer,
120 connection_cache: Arc<ConnectionCache<P, M, C>>,
121}
122
123#[allow(deprecated)]
124impl<P, M, C> ThinClient<P, M, C>
125where
126 P: ConnectionPool<NewConnectionConfig = C>,
127 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
128 C: NewConnectionConfig,
129{
130 pub fn new(
134 rpc_addr: SocketAddr,
135 tpu_addr: SocketAddr,
136 connection_cache: Arc<ConnectionCache<P, M, C>>,
137 ) -> Self {
138 Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
139 }
140
141 pub fn new_socket_with_timeout(
142 rpc_addr: SocketAddr,
143 tpu_addr: SocketAddr,
144 timeout: Duration,
145 connection_cache: Arc<ConnectionCache<P, M, C>>,
146 ) -> Self {
147 let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
148 Self::new_from_client(rpc_client, tpu_addr, connection_cache)
149 }
150
151 fn new_from_client(
152 rpc_client: RpcClient,
153 tpu_addr: SocketAddr,
154 connection_cache: Arc<ConnectionCache<P, M, C>>,
155 ) -> Self {
156 Self {
157 rpc_clients: vec![rpc_client],
158 tpu_addrs: vec![tpu_addr],
159 optimizer: ClientOptimizer::new(0),
160 connection_cache,
161 }
162 }
163
164 pub fn new_from_addrs(
165 rpc_addrs: Vec<SocketAddr>,
166 tpu_addrs: Vec<SocketAddr>,
167 connection_cache: Arc<ConnectionCache<P, M, C>>,
168 ) -> Self {
169 assert!(!rpc_addrs.is_empty());
170 assert_eq!(rpc_addrs.len(), tpu_addrs.len());
171
172 let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
173 let optimizer = ClientOptimizer::new(rpc_clients.len());
174 Self {
175 rpc_clients,
176 tpu_addrs,
177 optimizer,
178 connection_cache,
179 }
180 }
181
182 fn tpu_addr(&self) -> &SocketAddr {
183 &self.tpu_addrs[self.optimizer.best()]
184 }
185
186 pub fn rpc_client(&self) -> &RpcClient {
187 &self.rpc_clients[self.optimizer.best()]
188 }
189
190 pub fn retry_transfer_until_confirmed(
192 &self,
193 keypair: &Keypair,
194 transaction: &mut Transaction,
195 tries: usize,
196 min_confirmed_blocks: usize,
197 ) -> TransportResult<Signature> {
198 self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
199 }
200
201 pub fn retry_transfer(
203 &self,
204 keypair: &Keypair,
205 transaction: &mut Transaction,
206 tries: usize,
207 ) -> TransportResult<Signature> {
208 self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
209 }
210
211 pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
212 &self,
213 keypairs: &T,
214 transaction: &mut Transaction,
215 tries: usize,
216 pending_confirmations: usize,
217 ) -> TransportResult<Signature> {
218 for x in 0..tries {
219 let now = Instant::now();
220 let mut num_confirmed = 0;
221 let mut wait_time = MAX_PROCESSING_AGE;
222 let wire_transaction =
224 bincode::serialize(&transaction).expect("transaction serialization failed");
225 while now.elapsed().as_secs() < wait_time as u64 {
226 if num_confirmed == 0 {
227 let conn = self.connection_cache.get_connection(self.tpu_addr());
228 #[allow(clippy::needless_borrow)]
230 conn.send_data(&wire_transaction)?;
231 }
232
233 if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
234 &transaction.signatures[0],
235 pending_confirmations,
236 ) {
237 num_confirmed = confirmed_blocks;
238 if confirmed_blocks >= pending_confirmations {
239 return Ok(transaction.signatures[0]);
240 }
241 wait_time = wait_time.max(
245 MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
246 );
247 }
248 }
249 info!("{} tries failed transfer to {}", x, self.tpu_addr());
250 let blockhash = self.get_latest_blockhash()?;
251 transaction.sign(keypairs, blockhash);
252 }
253 Err(io::Error::new(
254 io::ErrorKind::Other,
255 format!("retry_transfer failed in {tries} retries"),
256 )
257 .into())
258 }
259
260 pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
261 self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
262 }
263
264 pub fn poll_get_balance_with_commitment(
265 &self,
266 pubkey: &Pubkey,
267 commitment_config: CommitmentConfig,
268 ) -> TransportResult<u64> {
269 self.rpc_client()
270 .poll_get_balance_with_commitment(pubkey, commitment_config)
271 .map_err(|e| e.into())
272 }
273
274 pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
275 self.rpc_client().wait_for_balance_with_commitment(
276 pubkey,
277 expected_balance,
278 CommitmentConfig::default(),
279 )
280 }
281
282 pub fn get_program_accounts_with_config(
283 &self,
284 pubkey: &Pubkey,
285 config: RpcProgramAccountsConfig,
286 ) -> TransportResult<Vec<(Pubkey, Account)>> {
287 self.rpc_client()
288 .get_program_accounts_with_config(pubkey, config)
289 .map_err(|e| e.into())
290 }
291
292 pub fn wait_for_balance_with_commitment(
293 &self,
294 pubkey: &Pubkey,
295 expected_balance: Option<u64>,
296 commitment_config: CommitmentConfig,
297 ) -> Option<u64> {
298 self.rpc_client().wait_for_balance_with_commitment(
299 pubkey,
300 expected_balance,
301 commitment_config,
302 )
303 }
304
305 pub fn poll_for_signature_with_commitment(
306 &self,
307 signature: &Signature,
308 commitment_config: CommitmentConfig,
309 ) -> TransportResult<()> {
310 self.rpc_client()
311 .poll_for_signature_with_commitment(signature, commitment_config)
312 .map_err(|e| e.into())
313 }
314
315 pub fn get_num_blocks_since_signature_confirmation(
316 &mut self,
317 sig: &Signature,
318 ) -> TransportResult<usize> {
319 self.rpc_client()
320 .get_num_blocks_since_signature_confirmation(sig)
321 .map_err(|e| e.into())
322 }
323}
324
325#[allow(deprecated)]
326impl<P, M, C> Client for ThinClient<P, M, C>
327where
328 P: ConnectionPool<NewConnectionConfig = C>,
329 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
330 C: NewConnectionConfig,
331{
332 fn tpu_addr(&self) -> String {
333 self.tpu_addr().to_string()
334 }
335}
336
337#[allow(deprecated)]
338impl<P, M, C> SyncClient for ThinClient<P, M, C>
339where
340 P: ConnectionPool<NewConnectionConfig = C>,
341 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
342 C: NewConnectionConfig,
343{
344 fn send_and_confirm_message<T: Signers + ?Sized>(
345 &self,
346 keypairs: &T,
347 message: Message,
348 ) -> TransportResult<Signature> {
349 let blockhash = self.get_latest_blockhash()?;
350 let mut transaction = Transaction::new(keypairs, message, blockhash);
351 let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
352 Ok(signature)
353 }
354
355 fn send_and_confirm_instruction(
356 &self,
357 keypair: &Keypair,
358 instruction: Instruction,
359 ) -> TransportResult<Signature> {
360 let message = Message::new(&[instruction], Some(&keypair.pubkey()));
361 self.send_and_confirm_message(&[keypair], message)
362 }
363
364 fn transfer_and_confirm(
365 &self,
366 lamports: u64,
367 keypair: &Keypair,
368 pubkey: &Pubkey,
369 ) -> TransportResult<Signature> {
370 let transfer_instruction = transfer(&keypair.pubkey(), pubkey, lamports);
371 self.send_and_confirm_instruction(keypair, transfer_instruction)
372 }
373
374 fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
375 Ok(self.rpc_client().get_account_data(pubkey).ok())
376 }
377
378 fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
379 let account = self.rpc_client().get_account(pubkey);
380 match account {
381 Ok(value) => Ok(Some(value)),
382 Err(_) => Ok(None),
383 }
384 }
385
386 fn get_account_with_commitment(
387 &self,
388 pubkey: &Pubkey,
389 commitment_config: CommitmentConfig,
390 ) -> TransportResult<Option<Account>> {
391 self.rpc_client()
392 .get_account_with_commitment(pubkey, commitment_config)
393 .map_err(|e| e.into())
394 .map(|r| r.value)
395 }
396
397 fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
398 self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
399 }
400
401 fn get_balance_with_commitment(
402 &self,
403 pubkey: &Pubkey,
404 commitment_config: CommitmentConfig,
405 ) -> TransportResult<u64> {
406 self.rpc_client()
407 .get_balance_with_commitment(pubkey, commitment_config)
408 .map_err(|e| e.into())
409 .map(|r| r.value)
410 }
411
412 fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
413 self.rpc_client()
414 .get_minimum_balance_for_rent_exemption(data_len)
415 .map_err(|e| e.into())
416 }
417
418 fn get_signature_status(
419 &self,
420 signature: &Signature,
421 ) -> TransportResult<Option<TransactionResult<()>>> {
422 let status = self
423 .rpc_client()
424 .get_signature_status(signature)
425 .map_err(|err| {
426 io::Error::new(
427 io::ErrorKind::Other,
428 format!("send_transaction failed with error {err:?}"),
429 )
430 })?;
431 Ok(status)
432 }
433
434 fn get_signature_status_with_commitment(
435 &self,
436 signature: &Signature,
437 commitment_config: CommitmentConfig,
438 ) -> TransportResult<Option<TransactionResult<()>>> {
439 let status = self
440 .rpc_client()
441 .get_signature_status_with_commitment(signature, commitment_config)
442 .map_err(|err| {
443 io::Error::new(
444 io::ErrorKind::Other,
445 format!("send_transaction failed with error {err:?}"),
446 )
447 })?;
448 Ok(status)
449 }
450
451 fn get_slot(&self) -> TransportResult<u64> {
452 self.get_slot_with_commitment(CommitmentConfig::default())
453 }
454
455 fn get_slot_with_commitment(
456 &self,
457 commitment_config: CommitmentConfig,
458 ) -> TransportResult<u64> {
459 let slot = self
460 .rpc_client()
461 .get_slot_with_commitment(commitment_config)
462 .map_err(|err| {
463 io::Error::new(
464 io::ErrorKind::Other,
465 format!("send_transaction failed with error {err:?}"),
466 )
467 })?;
468 Ok(slot)
469 }
470
471 fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
472 self.rpc_client().get_epoch_info().map_err(|e| e.into())
473 }
474
475 fn get_transaction_count(&self) -> TransportResult<u64> {
476 let index = self.optimizer.experiment();
477 let now = Instant::now();
478 match self.rpc_client().get_transaction_count() {
479 Ok(transaction_count) => {
480 self.optimizer
481 .report(index, now.elapsed().as_millis() as u64);
482 Ok(transaction_count)
483 }
484 Err(e) => {
485 self.optimizer.report(index, u64::MAX);
486 Err(e.into())
487 }
488 }
489 }
490
491 fn get_transaction_count_with_commitment(
492 &self,
493 commitment_config: CommitmentConfig,
494 ) -> TransportResult<u64> {
495 let index = self.optimizer.experiment();
496 let now = Instant::now();
497 match self
498 .rpc_client()
499 .get_transaction_count_with_commitment(commitment_config)
500 {
501 Ok(transaction_count) => {
502 self.optimizer
503 .report(index, now.elapsed().as_millis() as u64);
504 Ok(transaction_count)
505 }
506 Err(e) => {
507 self.optimizer.report(index, u64::MAX);
508 Err(e.into())
509 }
510 }
511 }
512
513 fn poll_for_signature_confirmation(
515 &self,
516 signature: &Signature,
517 min_confirmed_blocks: usize,
518 ) -> TransportResult<usize> {
519 self.rpc_client()
520 .poll_for_signature_confirmation(signature, min_confirmed_blocks)
521 .map_err(|e| e.into())
522 }
523
524 fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
525 self.rpc_client()
526 .poll_for_signature(signature)
527 .map_err(|e| e.into())
528 }
529
530 fn get_latest_blockhash(&self) -> TransportResult<Hash> {
531 let (blockhash, _) =
532 self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
533 Ok(blockhash)
534 }
535
536 fn get_latest_blockhash_with_commitment(
537 &self,
538 commitment_config: CommitmentConfig,
539 ) -> TransportResult<(Hash, u64)> {
540 let index = self.optimizer.experiment();
541 let now = Instant::now();
542 match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
543 Ok((blockhash, last_valid_block_height)) => {
544 self.optimizer
545 .report(index, now.elapsed().as_millis() as u64);
546 Ok((blockhash, last_valid_block_height))
547 }
548 Err(e) => {
549 self.optimizer.report(index, u64::MAX);
550 Err(e.into())
551 }
552 }
553 }
554
555 fn is_blockhash_valid(
556 &self,
557 blockhash: &Hash,
558 commitment_config: CommitmentConfig,
559 ) -> TransportResult<bool> {
560 self.rpc_client()
561 .is_blockhash_valid(blockhash, commitment_config)
562 .map_err(|e| e.into())
563 }
564
565 fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
566 self.rpc_client()
567 .get_fee_for_message(message)
568 .map_err(|e| e.into())
569 }
570}
571
572#[allow(deprecated)]
573impl<P, M, C> AsyncClient for ThinClient<P, M, C>
574where
575 P: ConnectionPool<NewConnectionConfig = C>,
576 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
577 C: NewConnectionConfig,
578{
579 fn async_send_versioned_transaction(
580 &self,
581 transaction: VersionedTransaction,
582 ) -> TransportResult<Signature> {
583 let conn = self.connection_cache.get_connection(self.tpu_addr());
584 let wire_transaction =
585 bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
586 conn.send_data(&wire_transaction)?;
587 Ok(transaction.signatures[0])
588 }
589
590 fn async_send_versioned_transaction_batch(
591 &self,
592 batch: Vec<VersionedTransaction>,
593 ) -> TransportResult<()> {
594 let conn = self.connection_cache.get_connection(self.tpu_addr());
595 let buffers = batch
596 .into_par_iter()
597 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
598 .collect::<Vec<_>>();
599 conn.send_data_batch(&buffers)?;
600 Ok(())
601 }
602}
603
604fn min_index(array: &[u64]) -> (u64, usize) {
605 let mut min_time = u64::MAX;
606 let mut min_index = 0;
607 for (i, time) in array.iter().enumerate() {
608 if *time < min_time {
609 min_time = *time;
610 min_index = i;
611 }
612 }
613 (min_time, min_index)
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 #[test]
621 fn test_client_optimizer() {
622 solana_logger::setup();
623
624 const NUM_CLIENTS: usize = 5;
625 let optimizer = ClientOptimizer::new(NUM_CLIENTS);
626 (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
627 let index = optimizer.experiment();
628 optimizer.report(index, (NUM_CLIENTS - index) as u64);
629 });
630
631 let index = optimizer.experiment();
632 optimizer.report(index, 50);
633 assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
634
635 optimizer.report(optimizer.best(), u64::MAX);
636 assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
637 }
638}