solana_thin_client/
thin_client.rs

1//! The `thin_client` module is a client-side object that interfaces with
2//! a server-side TPU.  Client code should use this object instead of writing
3//! messages to the network directly. The binary encoding of its messages are
4//! unstable and may change in future releases.
5
6use {
7    log::*,
8    rayon::iter::{IntoParallelIterator, ParallelIterator},
9    solana_account::Account,
10    solana_client_traits::{AsyncClient, Client, SyncClient},
11    solana_clock::MAX_PROCESSING_AGE,
12    solana_commitment_config::CommitmentConfig,
13    solana_connection_cache::{
14        client_connection::ClientConnection,
15        connection_cache::{
16            ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
17        },
18    },
19    solana_epoch_info::EpochInfo,
20    solana_hash::Hash,
21    solana_instruction::Instruction,
22    solana_keypair::Keypair,
23    solana_message::Message,
24    solana_pubkey::Pubkey,
25    solana_rpc_client::rpc_client::RpcClient,
26    solana_rpc_client_api::config::RpcProgramAccountsConfig,
27    solana_signature::Signature,
28    solana_signer::{signers::Signers, Signer},
29    solana_system_interface::instruction::transfer,
30    solana_transaction::{versioned::VersionedTransaction, Transaction},
31    solana_transaction_error::{TransactionResult, TransportResult},
32    std::{
33        io,
34        net::SocketAddr,
35        sync::{
36            atomic::{AtomicBool, AtomicUsize, Ordering},
37            Arc, RwLock,
38        },
39        time::{Duration, Instant},
40    },
41};
42
43struct ClientOptimizer {
44    cur_index: AtomicUsize,
45    experiment_index: AtomicUsize,
46    experiment_done: AtomicBool,
47    times: RwLock<Vec<u64>>,
48    num_clients: usize,
49}
50
51impl ClientOptimizer {
52    fn new(num_clients: usize) -> Self {
53        Self {
54            cur_index: AtomicUsize::new(0),
55            experiment_index: AtomicUsize::new(0),
56            experiment_done: AtomicBool::new(false),
57            times: RwLock::new(vec![u64::MAX; num_clients]),
58            num_clients,
59        }
60    }
61
62    fn experiment(&self) -> usize {
63        if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
64            let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
65            if old < self.num_clients {
66                old
67            } else {
68                self.best()
69            }
70        } else {
71            self.best()
72        }
73    }
74
75    fn report(&self, index: usize, time_ms: u64) {
76        if self.num_clients > 1
77            && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == u64::MAX)
78        {
79            trace!(
80                "report {} with {} exp: {}",
81                index,
82                time_ms,
83                self.experiment_index.load(Ordering::Relaxed)
84            );
85
86            self.times.write().unwrap()[index] = time_ms;
87
88            if index == (self.num_clients - 1) || time_ms == u64::MAX {
89                let times = self.times.read().unwrap();
90                let (min_time, min_index) = min_index(&times);
91                trace!(
92                    "done experimenting min: {} time: {} times: {:?}",
93                    min_index,
94                    min_time,
95                    times
96                );
97
98                // Only 1 thread should grab the num_clients-1 index, so this should be ok.
99                self.cur_index.store(min_index, Ordering::Relaxed);
100                self.experiment_done.store(true, Ordering::Relaxed);
101            }
102        }
103    }
104
105    fn best(&self) -> usize {
106        self.cur_index.load(Ordering::Relaxed)
107    }
108}
109
110/// An object for querying and sending transactions to the network.
111#[deprecated(since = "2.0.0", note = "Use [RpcClient] or [TpuClient] instead.")]
112pub struct ThinClient<
113    P, // ConnectionPool
114    M, // ConnectionManager
115    C, // NewConnectionConfig
116> {
117    rpc_clients: Vec<RpcClient>,
118    tpu_addrs: Vec<SocketAddr>,
119    optimizer: ClientOptimizer,
120    connection_cache: Arc<ConnectionCache<P, M, C>>,
121}
122
123#[allow(deprecated)]
124impl<P, M, C> ThinClient<P, M, C>
125where
126    P: ConnectionPool<NewConnectionConfig = C>,
127    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
128    C: NewConnectionConfig,
129{
130    /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
131    /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
132    /// (currently hardcoded to UDP)
133    pub fn new(
134        rpc_addr: SocketAddr,
135        tpu_addr: SocketAddr,
136        connection_cache: Arc<ConnectionCache<P, M, C>>,
137    ) -> Self {
138        Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
139    }
140
141    pub fn new_socket_with_timeout(
142        rpc_addr: SocketAddr,
143        tpu_addr: SocketAddr,
144        timeout: Duration,
145        connection_cache: Arc<ConnectionCache<P, M, C>>,
146    ) -> Self {
147        let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
148        Self::new_from_client(rpc_client, tpu_addr, connection_cache)
149    }
150
151    fn new_from_client(
152        rpc_client: RpcClient,
153        tpu_addr: SocketAddr,
154        connection_cache: Arc<ConnectionCache<P, M, C>>,
155    ) -> Self {
156        Self {
157            rpc_clients: vec![rpc_client],
158            tpu_addrs: vec![tpu_addr],
159            optimizer: ClientOptimizer::new(0),
160            connection_cache,
161        }
162    }
163
164    pub fn new_from_addrs(
165        rpc_addrs: Vec<SocketAddr>,
166        tpu_addrs: Vec<SocketAddr>,
167        connection_cache: Arc<ConnectionCache<P, M, C>>,
168    ) -> Self {
169        assert!(!rpc_addrs.is_empty());
170        assert_eq!(rpc_addrs.len(), tpu_addrs.len());
171
172        let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
173        let optimizer = ClientOptimizer::new(rpc_clients.len());
174        Self {
175            rpc_clients,
176            tpu_addrs,
177            optimizer,
178            connection_cache,
179        }
180    }
181
182    fn tpu_addr(&self) -> &SocketAddr {
183        &self.tpu_addrs[self.optimizer.best()]
184    }
185
186    pub fn rpc_client(&self) -> &RpcClient {
187        &self.rpc_clients[self.optimizer.best()]
188    }
189
190    /// Retry a sending a signed Transaction to the server for processing.
191    pub fn retry_transfer_until_confirmed(
192        &self,
193        keypair: &Keypair,
194        transaction: &mut Transaction,
195        tries: usize,
196        min_confirmed_blocks: usize,
197    ) -> TransportResult<Signature> {
198        self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
199    }
200
201    /// Retry sending a signed Transaction with one signing Keypair to the server for processing.
202    pub fn retry_transfer(
203        &self,
204        keypair: &Keypair,
205        transaction: &mut Transaction,
206        tries: usize,
207    ) -> TransportResult<Signature> {
208        self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
209    }
210
211    pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
212        &self,
213        keypairs: &T,
214        transaction: &mut Transaction,
215        tries: usize,
216        pending_confirmations: usize,
217    ) -> TransportResult<Signature> {
218        for x in 0..tries {
219            let now = Instant::now();
220            let mut num_confirmed = 0;
221            let mut wait_time = MAX_PROCESSING_AGE;
222            // resend the same transaction until the transaction has no chance of succeeding
223            let wire_transaction =
224                bincode::serialize(&transaction).expect("transaction serialization failed");
225            while now.elapsed().as_secs() < wait_time as u64 {
226                if num_confirmed == 0 {
227                    let conn = self.connection_cache.get_connection(self.tpu_addr());
228                    // Send the transaction if there has been no confirmation (e.g. the first time)
229                    #[allow(clippy::needless_borrow)]
230                    conn.send_data(&wire_transaction)?;
231                }
232
233                if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
234                    &transaction.signatures[0],
235                    pending_confirmations,
236                ) {
237                    num_confirmed = confirmed_blocks;
238                    if confirmed_blocks >= pending_confirmations {
239                        return Ok(transaction.signatures[0]);
240                    }
241                    // Since network has seen the transaction, wait longer to receive
242                    // all pending confirmations. Resending the transaction could result into
243                    // extra transaction fees
244                    wait_time = wait_time.max(
245                        MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
246                    );
247                }
248            }
249            info!("{} tries failed transfer to {}", x, self.tpu_addr());
250            let blockhash = self.get_latest_blockhash()?;
251            transaction.sign(keypairs, blockhash);
252        }
253        Err(io::Error::new(
254            io::ErrorKind::Other,
255            format!("retry_transfer failed in {tries} retries"),
256        )
257        .into())
258    }
259
260    pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
261        self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
262    }
263
264    pub fn poll_get_balance_with_commitment(
265        &self,
266        pubkey: &Pubkey,
267        commitment_config: CommitmentConfig,
268    ) -> TransportResult<u64> {
269        self.rpc_client()
270            .poll_get_balance_with_commitment(pubkey, commitment_config)
271            .map_err(|e| e.into())
272    }
273
274    pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
275        self.rpc_client().wait_for_balance_with_commitment(
276            pubkey,
277            expected_balance,
278            CommitmentConfig::default(),
279        )
280    }
281
282    pub fn get_program_accounts_with_config(
283        &self,
284        pubkey: &Pubkey,
285        config: RpcProgramAccountsConfig,
286    ) -> TransportResult<Vec<(Pubkey, Account)>> {
287        self.rpc_client()
288            .get_program_accounts_with_config(pubkey, config)
289            .map_err(|e| e.into())
290    }
291
292    pub fn wait_for_balance_with_commitment(
293        &self,
294        pubkey: &Pubkey,
295        expected_balance: Option<u64>,
296        commitment_config: CommitmentConfig,
297    ) -> Option<u64> {
298        self.rpc_client().wait_for_balance_with_commitment(
299            pubkey,
300            expected_balance,
301            commitment_config,
302        )
303    }
304
305    pub fn poll_for_signature_with_commitment(
306        &self,
307        signature: &Signature,
308        commitment_config: CommitmentConfig,
309    ) -> TransportResult<()> {
310        self.rpc_client()
311            .poll_for_signature_with_commitment(signature, commitment_config)
312            .map_err(|e| e.into())
313    }
314
315    pub fn get_num_blocks_since_signature_confirmation(
316        &mut self,
317        sig: &Signature,
318    ) -> TransportResult<usize> {
319        self.rpc_client()
320            .get_num_blocks_since_signature_confirmation(sig)
321            .map_err(|e| e.into())
322    }
323}
324
325#[allow(deprecated)]
326impl<P, M, C> Client for ThinClient<P, M, C>
327where
328    P: ConnectionPool<NewConnectionConfig = C>,
329    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
330    C: NewConnectionConfig,
331{
332    fn tpu_addr(&self) -> String {
333        self.tpu_addr().to_string()
334    }
335}
336
337#[allow(deprecated)]
338impl<P, M, C> SyncClient for ThinClient<P, M, C>
339where
340    P: ConnectionPool<NewConnectionConfig = C>,
341    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
342    C: NewConnectionConfig,
343{
344    fn send_and_confirm_message<T: Signers + ?Sized>(
345        &self,
346        keypairs: &T,
347        message: Message,
348    ) -> TransportResult<Signature> {
349        let blockhash = self.get_latest_blockhash()?;
350        let mut transaction = Transaction::new(keypairs, message, blockhash);
351        let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
352        Ok(signature)
353    }
354
355    fn send_and_confirm_instruction(
356        &self,
357        keypair: &Keypair,
358        instruction: Instruction,
359    ) -> TransportResult<Signature> {
360        let message = Message::new(&[instruction], Some(&keypair.pubkey()));
361        self.send_and_confirm_message(&[keypair], message)
362    }
363
364    fn transfer_and_confirm(
365        &self,
366        lamports: u64,
367        keypair: &Keypair,
368        pubkey: &Pubkey,
369    ) -> TransportResult<Signature> {
370        let transfer_instruction = transfer(&keypair.pubkey(), pubkey, lamports);
371        self.send_and_confirm_instruction(keypair, transfer_instruction)
372    }
373
374    fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
375        Ok(self.rpc_client().get_account_data(pubkey).ok())
376    }
377
378    fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
379        let account = self.rpc_client().get_account(pubkey);
380        match account {
381            Ok(value) => Ok(Some(value)),
382            Err(_) => Ok(None),
383        }
384    }
385
386    fn get_account_with_commitment(
387        &self,
388        pubkey: &Pubkey,
389        commitment_config: CommitmentConfig,
390    ) -> TransportResult<Option<Account>> {
391        self.rpc_client()
392            .get_account_with_commitment(pubkey, commitment_config)
393            .map_err(|e| e.into())
394            .map(|r| r.value)
395    }
396
397    fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
398        self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
399    }
400
401    fn get_balance_with_commitment(
402        &self,
403        pubkey: &Pubkey,
404        commitment_config: CommitmentConfig,
405    ) -> TransportResult<u64> {
406        self.rpc_client()
407            .get_balance_with_commitment(pubkey, commitment_config)
408            .map_err(|e| e.into())
409            .map(|r| r.value)
410    }
411
412    fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
413        self.rpc_client()
414            .get_minimum_balance_for_rent_exemption(data_len)
415            .map_err(|e| e.into())
416    }
417
418    fn get_signature_status(
419        &self,
420        signature: &Signature,
421    ) -> TransportResult<Option<TransactionResult<()>>> {
422        let status = self
423            .rpc_client()
424            .get_signature_status(signature)
425            .map_err(|err| {
426                io::Error::new(
427                    io::ErrorKind::Other,
428                    format!("send_transaction failed with error {err:?}"),
429                )
430            })?;
431        Ok(status)
432    }
433
434    fn get_signature_status_with_commitment(
435        &self,
436        signature: &Signature,
437        commitment_config: CommitmentConfig,
438    ) -> TransportResult<Option<TransactionResult<()>>> {
439        let status = self
440            .rpc_client()
441            .get_signature_status_with_commitment(signature, commitment_config)
442            .map_err(|err| {
443                io::Error::new(
444                    io::ErrorKind::Other,
445                    format!("send_transaction failed with error {err:?}"),
446                )
447            })?;
448        Ok(status)
449    }
450
451    fn get_slot(&self) -> TransportResult<u64> {
452        self.get_slot_with_commitment(CommitmentConfig::default())
453    }
454
455    fn get_slot_with_commitment(
456        &self,
457        commitment_config: CommitmentConfig,
458    ) -> TransportResult<u64> {
459        let slot = self
460            .rpc_client()
461            .get_slot_with_commitment(commitment_config)
462            .map_err(|err| {
463                io::Error::new(
464                    io::ErrorKind::Other,
465                    format!("send_transaction failed with error {err:?}"),
466                )
467            })?;
468        Ok(slot)
469    }
470
471    fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
472        self.rpc_client().get_epoch_info().map_err(|e| e.into())
473    }
474
475    fn get_transaction_count(&self) -> TransportResult<u64> {
476        let index = self.optimizer.experiment();
477        let now = Instant::now();
478        match self.rpc_client().get_transaction_count() {
479            Ok(transaction_count) => {
480                self.optimizer
481                    .report(index, now.elapsed().as_millis() as u64);
482                Ok(transaction_count)
483            }
484            Err(e) => {
485                self.optimizer.report(index, u64::MAX);
486                Err(e.into())
487            }
488        }
489    }
490
491    fn get_transaction_count_with_commitment(
492        &self,
493        commitment_config: CommitmentConfig,
494    ) -> TransportResult<u64> {
495        let index = self.optimizer.experiment();
496        let now = Instant::now();
497        match self
498            .rpc_client()
499            .get_transaction_count_with_commitment(commitment_config)
500        {
501            Ok(transaction_count) => {
502                self.optimizer
503                    .report(index, now.elapsed().as_millis() as u64);
504                Ok(transaction_count)
505            }
506            Err(e) => {
507                self.optimizer.report(index, u64::MAX);
508                Err(e.into())
509            }
510        }
511    }
512
513    /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
514    fn poll_for_signature_confirmation(
515        &self,
516        signature: &Signature,
517        min_confirmed_blocks: usize,
518    ) -> TransportResult<usize> {
519        self.rpc_client()
520            .poll_for_signature_confirmation(signature, min_confirmed_blocks)
521            .map_err(|e| e.into())
522    }
523
524    fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
525        self.rpc_client()
526            .poll_for_signature(signature)
527            .map_err(|e| e.into())
528    }
529
530    fn get_latest_blockhash(&self) -> TransportResult<Hash> {
531        let (blockhash, _) =
532            self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
533        Ok(blockhash)
534    }
535
536    fn get_latest_blockhash_with_commitment(
537        &self,
538        commitment_config: CommitmentConfig,
539    ) -> TransportResult<(Hash, u64)> {
540        let index = self.optimizer.experiment();
541        let now = Instant::now();
542        match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
543            Ok((blockhash, last_valid_block_height)) => {
544                self.optimizer
545                    .report(index, now.elapsed().as_millis() as u64);
546                Ok((blockhash, last_valid_block_height))
547            }
548            Err(e) => {
549                self.optimizer.report(index, u64::MAX);
550                Err(e.into())
551            }
552        }
553    }
554
555    fn is_blockhash_valid(
556        &self,
557        blockhash: &Hash,
558        commitment_config: CommitmentConfig,
559    ) -> TransportResult<bool> {
560        self.rpc_client()
561            .is_blockhash_valid(blockhash, commitment_config)
562            .map_err(|e| e.into())
563    }
564
565    fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
566        self.rpc_client()
567            .get_fee_for_message(message)
568            .map_err(|e| e.into())
569    }
570}
571
572#[allow(deprecated)]
573impl<P, M, C> AsyncClient for ThinClient<P, M, C>
574where
575    P: ConnectionPool<NewConnectionConfig = C>,
576    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
577    C: NewConnectionConfig,
578{
579    fn async_send_versioned_transaction(
580        &self,
581        transaction: VersionedTransaction,
582    ) -> TransportResult<Signature> {
583        let conn = self.connection_cache.get_connection(self.tpu_addr());
584        let wire_transaction =
585            bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
586        conn.send_data(&wire_transaction)?;
587        Ok(transaction.signatures[0])
588    }
589
590    fn async_send_versioned_transaction_batch(
591        &self,
592        batch: Vec<VersionedTransaction>,
593    ) -> TransportResult<()> {
594        let conn = self.connection_cache.get_connection(self.tpu_addr());
595        let buffers = batch
596            .into_par_iter()
597            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
598            .collect::<Vec<_>>();
599        conn.send_data_batch(&buffers)?;
600        Ok(())
601    }
602}
603
604fn min_index(array: &[u64]) -> (u64, usize) {
605    let mut min_time = u64::MAX;
606    let mut min_index = 0;
607    for (i, time) in array.iter().enumerate() {
608        if *time < min_time {
609            min_time = *time;
610            min_index = i;
611        }
612    }
613    (min_time, min_index)
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619
620    #[test]
621    fn test_client_optimizer() {
622        solana_logger::setup();
623
624        const NUM_CLIENTS: usize = 5;
625        let optimizer = ClientOptimizer::new(NUM_CLIENTS);
626        (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
627            let index = optimizer.experiment();
628            optimizer.report(index, (NUM_CLIENTS - index) as u64);
629        });
630
631        let index = optimizer.experiment();
632        optimizer.report(index, 50);
633        assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
634
635        optimizer.report(optimizer.best(), u64::MAX);
636        assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
637    }
638}