1use {
7 log::*,
8 rayon::iter::{IntoParallelIterator, ParallelIterator},
9 solana_connection_cache::{
10 client_connection::ClientConnection,
11 connection_cache::{
12 ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
13 },
14 },
15 solana_rpc_client::rpc_client::RpcClient,
16 solana_rpc_client_api::config::RpcProgramAccountsConfig,
17 solana_sdk::{
18 account::Account,
19 client::{AsyncClient, Client, SyncClient},
20 clock::MAX_PROCESSING_AGE,
21 commitment_config::CommitmentConfig,
22 epoch_info::EpochInfo,
23 hash::Hash,
24 instruction::Instruction,
25 message::Message,
26 pubkey::Pubkey,
27 signature::{Keypair, Signature, Signer},
28 signers::Signers,
29 system_instruction,
30 transaction::{self, Transaction, VersionedTransaction},
31 transport::Result as TransportResult,
32 },
33 std::{
34 io,
35 net::SocketAddr,
36 sync::{
37 atomic::{AtomicBool, AtomicUsize, Ordering},
38 Arc, RwLock,
39 },
40 time::{Duration, Instant},
41 },
42};
43
44struct ClientOptimizer {
45 cur_index: AtomicUsize,
46 experiment_index: AtomicUsize,
47 experiment_done: AtomicBool,
48 times: RwLock<Vec<u64>>,
49 num_clients: usize,
50}
51
52impl ClientOptimizer {
53 fn new(num_clients: usize) -> Self {
54 Self {
55 cur_index: AtomicUsize::new(0),
56 experiment_index: AtomicUsize::new(0),
57 experiment_done: AtomicBool::new(false),
58 times: RwLock::new(vec![u64::MAX; num_clients]),
59 num_clients,
60 }
61 }
62
63 fn experiment(&self) -> usize {
64 if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
65 let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
66 if old < self.num_clients {
67 old
68 } else {
69 self.best()
70 }
71 } else {
72 self.best()
73 }
74 }
75
76 fn report(&self, index: usize, time_ms: u64) {
77 if self.num_clients > 1
78 && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == u64::MAX)
79 {
80 trace!(
81 "report {} with {} exp: {}",
82 index,
83 time_ms,
84 self.experiment_index.load(Ordering::Relaxed)
85 );
86
87 self.times.write().unwrap()[index] = time_ms;
88
89 if index == (self.num_clients - 1) || time_ms == u64::MAX {
90 let times = self.times.read().unwrap();
91 let (min_time, min_index) = min_index(×);
92 trace!(
93 "done experimenting min: {} time: {} times: {:?}",
94 min_index,
95 min_time,
96 times
97 );
98
99 self.cur_index.store(min_index, Ordering::Relaxed);
101 self.experiment_done.store(true, Ordering::Relaxed);
102 }
103 }
104 }
105
106 fn best(&self) -> usize {
107 self.cur_index.load(Ordering::Relaxed)
108 }
109}
110
111#[deprecated(since = "2.0.0", note = "Use [RpcClient] or [TpuClient] instead.")]
113pub struct ThinClient<
114 P, M, C, > {
118 rpc_clients: Vec<RpcClient>,
119 tpu_addrs: Vec<SocketAddr>,
120 optimizer: ClientOptimizer,
121 connection_cache: Arc<ConnectionCache<P, M, C>>,
122}
123
124#[allow(deprecated)]
125impl<P, M, C> ThinClient<P, M, C>
126where
127 P: ConnectionPool<NewConnectionConfig = C>,
128 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
129 C: NewConnectionConfig,
130{
131 pub fn new(
135 rpc_addr: SocketAddr,
136 tpu_addr: SocketAddr,
137 connection_cache: Arc<ConnectionCache<P, M, C>>,
138 ) -> Self {
139 Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
140 }
141
142 pub fn new_socket_with_timeout(
143 rpc_addr: SocketAddr,
144 tpu_addr: SocketAddr,
145 timeout: Duration,
146 connection_cache: Arc<ConnectionCache<P, M, C>>,
147 ) -> Self {
148 let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
149 Self::new_from_client(rpc_client, tpu_addr, connection_cache)
150 }
151
152 fn new_from_client(
153 rpc_client: RpcClient,
154 tpu_addr: SocketAddr,
155 connection_cache: Arc<ConnectionCache<P, M, C>>,
156 ) -> Self {
157 Self {
158 rpc_clients: vec![rpc_client],
159 tpu_addrs: vec![tpu_addr],
160 optimizer: ClientOptimizer::new(0),
161 connection_cache,
162 }
163 }
164
165 pub fn new_from_addrs(
166 rpc_addrs: Vec<SocketAddr>,
167 tpu_addrs: Vec<SocketAddr>,
168 connection_cache: Arc<ConnectionCache<P, M, C>>,
169 ) -> Self {
170 assert!(!rpc_addrs.is_empty());
171 assert_eq!(rpc_addrs.len(), tpu_addrs.len());
172
173 let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
174 let optimizer = ClientOptimizer::new(rpc_clients.len());
175 Self {
176 rpc_clients,
177 tpu_addrs,
178 optimizer,
179 connection_cache,
180 }
181 }
182
183 fn tpu_addr(&self) -> &SocketAddr {
184 &self.tpu_addrs[self.optimizer.best()]
185 }
186
187 pub fn rpc_client(&self) -> &RpcClient {
188 &self.rpc_clients[self.optimizer.best()]
189 }
190
191 pub fn retry_transfer_until_confirmed(
193 &self,
194 keypair: &Keypair,
195 transaction: &mut Transaction,
196 tries: usize,
197 min_confirmed_blocks: usize,
198 ) -> TransportResult<Signature> {
199 self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
200 }
201
202 pub fn retry_transfer(
204 &self,
205 keypair: &Keypair,
206 transaction: &mut Transaction,
207 tries: usize,
208 ) -> TransportResult<Signature> {
209 self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
210 }
211
212 pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
213 &self,
214 keypairs: &T,
215 transaction: &mut Transaction,
216 tries: usize,
217 pending_confirmations: usize,
218 ) -> TransportResult<Signature> {
219 for x in 0..tries {
220 let now = Instant::now();
221 let mut num_confirmed = 0;
222 let mut wait_time = MAX_PROCESSING_AGE;
223 let wire_transaction =
225 bincode::serialize(&transaction).expect("transaction serialization failed");
226 while now.elapsed().as_secs() < wait_time as u64 {
227 if num_confirmed == 0 {
228 let conn = self.connection_cache.get_connection(self.tpu_addr());
229 #[allow(clippy::needless_borrow)]
231 conn.send_data(&wire_transaction)?;
232 }
233
234 if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
235 &transaction.signatures[0],
236 pending_confirmations,
237 ) {
238 num_confirmed = confirmed_blocks;
239 if confirmed_blocks >= pending_confirmations {
240 return Ok(transaction.signatures[0]);
241 }
242 wait_time = wait_time.max(
246 MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
247 );
248 }
249 }
250 info!("{} tries failed transfer to {}", x, self.tpu_addr());
251 let blockhash = self.get_latest_blockhash()?;
252 transaction.sign(keypairs, blockhash);
253 }
254 Err(io::Error::new(
255 io::ErrorKind::Other,
256 format!("retry_transfer failed in {tries} retries"),
257 )
258 .into())
259 }
260
261 pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
262 self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
263 }
264
265 pub fn poll_get_balance_with_commitment(
266 &self,
267 pubkey: &Pubkey,
268 commitment_config: CommitmentConfig,
269 ) -> TransportResult<u64> {
270 self.rpc_client()
271 .poll_get_balance_with_commitment(pubkey, commitment_config)
272 .map_err(|e| e.into())
273 }
274
275 pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
276 self.rpc_client().wait_for_balance_with_commitment(
277 pubkey,
278 expected_balance,
279 CommitmentConfig::default(),
280 )
281 }
282
283 pub fn get_program_accounts_with_config(
284 &self,
285 pubkey: &Pubkey,
286 config: RpcProgramAccountsConfig,
287 ) -> TransportResult<Vec<(Pubkey, Account)>> {
288 self.rpc_client()
289 .get_program_accounts_with_config(pubkey, config)
290 .map_err(|e| e.into())
291 }
292
293 pub fn wait_for_balance_with_commitment(
294 &self,
295 pubkey: &Pubkey,
296 expected_balance: Option<u64>,
297 commitment_config: CommitmentConfig,
298 ) -> Option<u64> {
299 self.rpc_client().wait_for_balance_with_commitment(
300 pubkey,
301 expected_balance,
302 commitment_config,
303 )
304 }
305
306 pub fn poll_for_signature_with_commitment(
307 &self,
308 signature: &Signature,
309 commitment_config: CommitmentConfig,
310 ) -> TransportResult<()> {
311 self.rpc_client()
312 .poll_for_signature_with_commitment(signature, commitment_config)
313 .map_err(|e| e.into())
314 }
315
316 pub fn get_num_blocks_since_signature_confirmation(
317 &mut self,
318 sig: &Signature,
319 ) -> TransportResult<usize> {
320 self.rpc_client()
321 .get_num_blocks_since_signature_confirmation(sig)
322 .map_err(|e| e.into())
323 }
324}
325
326#[allow(deprecated)]
327impl<P, M, C> Client for ThinClient<P, M, C>
328where
329 P: ConnectionPool<NewConnectionConfig = C>,
330 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
331 C: NewConnectionConfig,
332{
333 fn tpu_addr(&self) -> String {
334 self.tpu_addr().to_string()
335 }
336}
337
338#[allow(deprecated)]
339impl<P, M, C> SyncClient for ThinClient<P, M, C>
340where
341 P: ConnectionPool<NewConnectionConfig = C>,
342 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
343 C: NewConnectionConfig,
344{
345 fn send_and_confirm_message<T: Signers + ?Sized>(
346 &self,
347 keypairs: &T,
348 message: Message,
349 ) -> TransportResult<Signature> {
350 let blockhash = self.get_latest_blockhash()?;
351 let mut transaction = Transaction::new(keypairs, message, blockhash);
352 let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
353 Ok(signature)
354 }
355
356 fn send_and_confirm_instruction(
357 &self,
358 keypair: &Keypair,
359 instruction: Instruction,
360 ) -> TransportResult<Signature> {
361 let message = Message::new(&[instruction], Some(&keypair.pubkey()));
362 self.send_and_confirm_message(&[keypair], message)
363 }
364
365 fn transfer_and_confirm(
366 &self,
367 lamports: u64,
368 keypair: &Keypair,
369 pubkey: &Pubkey,
370 ) -> TransportResult<Signature> {
371 let transfer_instruction =
372 system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
373 self.send_and_confirm_instruction(keypair, transfer_instruction)
374 }
375
376 fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
377 Ok(self.rpc_client().get_account_data(pubkey).ok())
378 }
379
380 fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
381 let account = self.rpc_client().get_account(pubkey);
382 match account {
383 Ok(value) => Ok(Some(value)),
384 Err(_) => Ok(None),
385 }
386 }
387
388 fn get_account_with_commitment(
389 &self,
390 pubkey: &Pubkey,
391 commitment_config: CommitmentConfig,
392 ) -> TransportResult<Option<Account>> {
393 self.rpc_client()
394 .get_account_with_commitment(pubkey, commitment_config)
395 .map_err(|e| e.into())
396 .map(|r| r.value)
397 }
398
399 fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
400 self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
401 }
402
403 fn get_balance_with_commitment(
404 &self,
405 pubkey: &Pubkey,
406 commitment_config: CommitmentConfig,
407 ) -> TransportResult<u64> {
408 self.rpc_client()
409 .get_balance_with_commitment(pubkey, commitment_config)
410 .map_err(|e| e.into())
411 .map(|r| r.value)
412 }
413
414 fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
415 self.rpc_client()
416 .get_minimum_balance_for_rent_exemption(data_len)
417 .map_err(|e| e.into())
418 }
419
420 fn get_signature_status(
421 &self,
422 signature: &Signature,
423 ) -> TransportResult<Option<transaction::Result<()>>> {
424 let status = self
425 .rpc_client()
426 .get_signature_status(signature)
427 .map_err(|err| {
428 io::Error::new(
429 io::ErrorKind::Other,
430 format!("send_transaction failed with error {err:?}"),
431 )
432 })?;
433 Ok(status)
434 }
435
436 fn get_signature_status_with_commitment(
437 &self,
438 signature: &Signature,
439 commitment_config: CommitmentConfig,
440 ) -> TransportResult<Option<transaction::Result<()>>> {
441 let status = self
442 .rpc_client()
443 .get_signature_status_with_commitment(signature, commitment_config)
444 .map_err(|err| {
445 io::Error::new(
446 io::ErrorKind::Other,
447 format!("send_transaction failed with error {err:?}"),
448 )
449 })?;
450 Ok(status)
451 }
452
453 fn get_slot(&self) -> TransportResult<u64> {
454 self.get_slot_with_commitment(CommitmentConfig::default())
455 }
456
457 fn get_slot_with_commitment(
458 &self,
459 commitment_config: CommitmentConfig,
460 ) -> TransportResult<u64> {
461 let slot = self
462 .rpc_client()
463 .get_slot_with_commitment(commitment_config)
464 .map_err(|err| {
465 io::Error::new(
466 io::ErrorKind::Other,
467 format!("send_transaction failed with error {err:?}"),
468 )
469 })?;
470 Ok(slot)
471 }
472
473 fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
474 self.rpc_client().get_epoch_info().map_err(|e| e.into())
475 }
476
477 fn get_transaction_count(&self) -> TransportResult<u64> {
478 let index = self.optimizer.experiment();
479 let now = Instant::now();
480 match self.rpc_client().get_transaction_count() {
481 Ok(transaction_count) => {
482 self.optimizer
483 .report(index, now.elapsed().as_millis() as u64);
484 Ok(transaction_count)
485 }
486 Err(e) => {
487 self.optimizer.report(index, u64::MAX);
488 Err(e.into())
489 }
490 }
491 }
492
493 fn get_transaction_count_with_commitment(
494 &self,
495 commitment_config: CommitmentConfig,
496 ) -> TransportResult<u64> {
497 let index = self.optimizer.experiment();
498 let now = Instant::now();
499 match self
500 .rpc_client()
501 .get_transaction_count_with_commitment(commitment_config)
502 {
503 Ok(transaction_count) => {
504 self.optimizer
505 .report(index, now.elapsed().as_millis() as u64);
506 Ok(transaction_count)
507 }
508 Err(e) => {
509 self.optimizer.report(index, u64::MAX);
510 Err(e.into())
511 }
512 }
513 }
514
515 fn poll_for_signature_confirmation(
517 &self,
518 signature: &Signature,
519 min_confirmed_blocks: usize,
520 ) -> TransportResult<usize> {
521 self.rpc_client()
522 .poll_for_signature_confirmation(signature, min_confirmed_blocks)
523 .map_err(|e| e.into())
524 }
525
526 fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
527 self.rpc_client()
528 .poll_for_signature(signature)
529 .map_err(|e| e.into())
530 }
531
532 fn get_latest_blockhash(&self) -> TransportResult<Hash> {
533 let (blockhash, _) =
534 self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
535 Ok(blockhash)
536 }
537
538 fn get_latest_blockhash_with_commitment(
539 &self,
540 commitment_config: CommitmentConfig,
541 ) -> TransportResult<(Hash, u64)> {
542 let index = self.optimizer.experiment();
543 let now = Instant::now();
544 match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
545 Ok((blockhash, last_valid_block_height)) => {
546 self.optimizer
547 .report(index, now.elapsed().as_millis() as u64);
548 Ok((blockhash, last_valid_block_height))
549 }
550 Err(e) => {
551 self.optimizer.report(index, u64::MAX);
552 Err(e.into())
553 }
554 }
555 }
556
557 fn is_blockhash_valid(
558 &self,
559 blockhash: &Hash,
560 commitment_config: CommitmentConfig,
561 ) -> TransportResult<bool> {
562 self.rpc_client()
563 .is_blockhash_valid(blockhash, commitment_config)
564 .map_err(|e| e.into())
565 }
566
567 fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
568 self.rpc_client()
569 .get_fee_for_message(message)
570 .map_err(|e| e.into())
571 }
572}
573
574#[allow(deprecated)]
575impl<P, M, C> AsyncClient for ThinClient<P, M, C>
576where
577 P: ConnectionPool<NewConnectionConfig = C>,
578 M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
579 C: NewConnectionConfig,
580{
581 fn async_send_versioned_transaction(
582 &self,
583 transaction: VersionedTransaction,
584 ) -> TransportResult<Signature> {
585 let conn = self.connection_cache.get_connection(self.tpu_addr());
586 let wire_transaction =
587 bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
588 conn.send_data(&wire_transaction)?;
589 Ok(transaction.signatures[0])
590 }
591
592 fn async_send_versioned_transaction_batch(
593 &self,
594 batch: Vec<VersionedTransaction>,
595 ) -> TransportResult<()> {
596 let conn = self.connection_cache.get_connection(self.tpu_addr());
597 let buffers = batch
598 .into_par_iter()
599 .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
600 .collect::<Vec<_>>();
601 conn.send_data_batch(&buffers)?;
602 Ok(())
603 }
604}
605
606fn min_index(array: &[u64]) -> (u64, usize) {
607 let mut min_time = u64::MAX;
608 let mut min_index = 0;
609 for (i, time) in array.iter().enumerate() {
610 if *time < min_time {
611 min_time = *time;
612 min_index = i;
613 }
614 }
615 (min_time, min_index)
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 #[test]
623 fn test_client_optimizer() {
624 solana_logger::setup();
625
626 const NUM_CLIENTS: usize = 5;
627 let optimizer = ClientOptimizer::new(NUM_CLIENTS);
628 (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
629 let index = optimizer.experiment();
630 optimizer.report(index, (NUM_CLIENTS - index) as u64);
631 });
632
633 let index = optimizer.experiment();
634 optimizer.report(index, 50);
635 assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
636
637 optimizer.report(optimizer.best(), u64::MAX);
638 assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
639 }
640}