solana_thin_client/
thin_client.rs

1//! The `thin_client` module is a client-side object that interfaces with
2//! a server-side TPU.  Client code should use this object instead of writing
3//! messages to the network directly. The binary encoding of its messages are
4//! unstable and may change in future releases.
5
6use {
7    log::*,
8    rayon::iter::{IntoParallelIterator, ParallelIterator},
9    solana_connection_cache::{
10        client_connection::ClientConnection,
11        connection_cache::{
12            ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
13        },
14    },
15    solana_rpc_client::rpc_client::RpcClient,
16    solana_rpc_client_api::config::RpcProgramAccountsConfig,
17    solana_sdk::{
18        account::Account,
19        client::{AsyncClient, Client, SyncClient},
20        clock::MAX_PROCESSING_AGE,
21        commitment_config::CommitmentConfig,
22        epoch_info::EpochInfo,
23        hash::Hash,
24        instruction::Instruction,
25        message::Message,
26        pubkey::Pubkey,
27        signature::{Keypair, Signature, Signer},
28        signers::Signers,
29        system_instruction,
30        transaction::{self, Transaction, VersionedTransaction},
31        transport::Result as TransportResult,
32    },
33    std::{
34        io,
35        net::SocketAddr,
36        sync::{
37            atomic::{AtomicBool, AtomicUsize, Ordering},
38            Arc, RwLock,
39        },
40        time::{Duration, Instant},
41    },
42};
43
44struct ClientOptimizer {
45    cur_index: AtomicUsize,
46    experiment_index: AtomicUsize,
47    experiment_done: AtomicBool,
48    times: RwLock<Vec<u64>>,
49    num_clients: usize,
50}
51
52impl ClientOptimizer {
53    fn new(num_clients: usize) -> Self {
54        Self {
55            cur_index: AtomicUsize::new(0),
56            experiment_index: AtomicUsize::new(0),
57            experiment_done: AtomicBool::new(false),
58            times: RwLock::new(vec![u64::MAX; num_clients]),
59            num_clients,
60        }
61    }
62
63    fn experiment(&self) -> usize {
64        if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
65            let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
66            if old < self.num_clients {
67                old
68            } else {
69                self.best()
70            }
71        } else {
72            self.best()
73        }
74    }
75
76    fn report(&self, index: usize, time_ms: u64) {
77        if self.num_clients > 1
78            && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == u64::MAX)
79        {
80            trace!(
81                "report {} with {} exp: {}",
82                index,
83                time_ms,
84                self.experiment_index.load(Ordering::Relaxed)
85            );
86
87            self.times.write().unwrap()[index] = time_ms;
88
89            if index == (self.num_clients - 1) || time_ms == u64::MAX {
90                let times = self.times.read().unwrap();
91                let (min_time, min_index) = min_index(&times);
92                trace!(
93                    "done experimenting min: {} time: {} times: {:?}",
94                    min_index,
95                    min_time,
96                    times
97                );
98
99                // Only 1 thread should grab the num_clients-1 index, so this should be ok.
100                self.cur_index.store(min_index, Ordering::Relaxed);
101                self.experiment_done.store(true, Ordering::Relaxed);
102            }
103        }
104    }
105
106    fn best(&self) -> usize {
107        self.cur_index.load(Ordering::Relaxed)
108    }
109}
110
111/// An object for querying and sending transactions to the network.
112#[deprecated(since = "2.0.0", note = "Use [RpcClient] or [TpuClient] instead.")]
113pub struct ThinClient<
114    P, // ConnectionPool
115    M, // ConnectionManager
116    C, // NewConnectionConfig
117> {
118    rpc_clients: Vec<RpcClient>,
119    tpu_addrs: Vec<SocketAddr>,
120    optimizer: ClientOptimizer,
121    connection_cache: Arc<ConnectionCache<P, M, C>>,
122}
123
124#[allow(deprecated)]
125impl<P, M, C> ThinClient<P, M, C>
126where
127    P: ConnectionPool<NewConnectionConfig = C>,
128    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
129    C: NewConnectionConfig,
130{
131    /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
132    /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
133    /// (currently hardcoded to UDP)
134    pub fn new(
135        rpc_addr: SocketAddr,
136        tpu_addr: SocketAddr,
137        connection_cache: Arc<ConnectionCache<P, M, C>>,
138    ) -> Self {
139        Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
140    }
141
142    pub fn new_socket_with_timeout(
143        rpc_addr: SocketAddr,
144        tpu_addr: SocketAddr,
145        timeout: Duration,
146        connection_cache: Arc<ConnectionCache<P, M, C>>,
147    ) -> Self {
148        let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
149        Self::new_from_client(rpc_client, tpu_addr, connection_cache)
150    }
151
152    fn new_from_client(
153        rpc_client: RpcClient,
154        tpu_addr: SocketAddr,
155        connection_cache: Arc<ConnectionCache<P, M, C>>,
156    ) -> Self {
157        Self {
158            rpc_clients: vec![rpc_client],
159            tpu_addrs: vec![tpu_addr],
160            optimizer: ClientOptimizer::new(0),
161            connection_cache,
162        }
163    }
164
165    pub fn new_from_addrs(
166        rpc_addrs: Vec<SocketAddr>,
167        tpu_addrs: Vec<SocketAddr>,
168        connection_cache: Arc<ConnectionCache<P, M, C>>,
169    ) -> Self {
170        assert!(!rpc_addrs.is_empty());
171        assert_eq!(rpc_addrs.len(), tpu_addrs.len());
172
173        let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
174        let optimizer = ClientOptimizer::new(rpc_clients.len());
175        Self {
176            rpc_clients,
177            tpu_addrs,
178            optimizer,
179            connection_cache,
180        }
181    }
182
183    fn tpu_addr(&self) -> &SocketAddr {
184        &self.tpu_addrs[self.optimizer.best()]
185    }
186
187    pub fn rpc_client(&self) -> &RpcClient {
188        &self.rpc_clients[self.optimizer.best()]
189    }
190
191    /// Retry a sending a signed Transaction to the server for processing.
192    pub fn retry_transfer_until_confirmed(
193        &self,
194        keypair: &Keypair,
195        transaction: &mut Transaction,
196        tries: usize,
197        min_confirmed_blocks: usize,
198    ) -> TransportResult<Signature> {
199        self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
200    }
201
202    /// Retry sending a signed Transaction with one signing Keypair to the server for processing.
203    pub fn retry_transfer(
204        &self,
205        keypair: &Keypair,
206        transaction: &mut Transaction,
207        tries: usize,
208    ) -> TransportResult<Signature> {
209        self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
210    }
211
212    pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
213        &self,
214        keypairs: &T,
215        transaction: &mut Transaction,
216        tries: usize,
217        pending_confirmations: usize,
218    ) -> TransportResult<Signature> {
219        for x in 0..tries {
220            let now = Instant::now();
221            let mut num_confirmed = 0;
222            let mut wait_time = MAX_PROCESSING_AGE;
223            // resend the same transaction until the transaction has no chance of succeeding
224            let wire_transaction =
225                bincode::serialize(&transaction).expect("transaction serialization failed");
226            while now.elapsed().as_secs() < wait_time as u64 {
227                if num_confirmed == 0 {
228                    let conn = self.connection_cache.get_connection(self.tpu_addr());
229                    // Send the transaction if there has been no confirmation (e.g. the first time)
230                    #[allow(clippy::needless_borrow)]
231                    conn.send_data(&wire_transaction)?;
232                }
233
234                if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
235                    &transaction.signatures[0],
236                    pending_confirmations,
237                ) {
238                    num_confirmed = confirmed_blocks;
239                    if confirmed_blocks >= pending_confirmations {
240                        return Ok(transaction.signatures[0]);
241                    }
242                    // Since network has seen the transaction, wait longer to receive
243                    // all pending confirmations. Resending the transaction could result into
244                    // extra transaction fees
245                    wait_time = wait_time.max(
246                        MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
247                    );
248                }
249            }
250            info!("{} tries failed transfer to {}", x, self.tpu_addr());
251            let blockhash = self.get_latest_blockhash()?;
252            transaction.sign(keypairs, blockhash);
253        }
254        Err(io::Error::new(
255            io::ErrorKind::Other,
256            format!("retry_transfer failed in {tries} retries"),
257        )
258        .into())
259    }
260
261    pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
262        self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
263    }
264
265    pub fn poll_get_balance_with_commitment(
266        &self,
267        pubkey: &Pubkey,
268        commitment_config: CommitmentConfig,
269    ) -> TransportResult<u64> {
270        self.rpc_client()
271            .poll_get_balance_with_commitment(pubkey, commitment_config)
272            .map_err(|e| e.into())
273    }
274
275    pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
276        self.rpc_client().wait_for_balance_with_commitment(
277            pubkey,
278            expected_balance,
279            CommitmentConfig::default(),
280        )
281    }
282
283    pub fn get_program_accounts_with_config(
284        &self,
285        pubkey: &Pubkey,
286        config: RpcProgramAccountsConfig,
287    ) -> TransportResult<Vec<(Pubkey, Account)>> {
288        self.rpc_client()
289            .get_program_accounts_with_config(pubkey, config)
290            .map_err(|e| e.into())
291    }
292
293    pub fn wait_for_balance_with_commitment(
294        &self,
295        pubkey: &Pubkey,
296        expected_balance: Option<u64>,
297        commitment_config: CommitmentConfig,
298    ) -> Option<u64> {
299        self.rpc_client().wait_for_balance_with_commitment(
300            pubkey,
301            expected_balance,
302            commitment_config,
303        )
304    }
305
306    pub fn poll_for_signature_with_commitment(
307        &self,
308        signature: &Signature,
309        commitment_config: CommitmentConfig,
310    ) -> TransportResult<()> {
311        self.rpc_client()
312            .poll_for_signature_with_commitment(signature, commitment_config)
313            .map_err(|e| e.into())
314    }
315
316    pub fn get_num_blocks_since_signature_confirmation(
317        &mut self,
318        sig: &Signature,
319    ) -> TransportResult<usize> {
320        self.rpc_client()
321            .get_num_blocks_since_signature_confirmation(sig)
322            .map_err(|e| e.into())
323    }
324}
325
326#[allow(deprecated)]
327impl<P, M, C> Client for ThinClient<P, M, C>
328where
329    P: ConnectionPool<NewConnectionConfig = C>,
330    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
331    C: NewConnectionConfig,
332{
333    fn tpu_addr(&self) -> String {
334        self.tpu_addr().to_string()
335    }
336}
337
338#[allow(deprecated)]
339impl<P, M, C> SyncClient for ThinClient<P, M, C>
340where
341    P: ConnectionPool<NewConnectionConfig = C>,
342    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
343    C: NewConnectionConfig,
344{
345    fn send_and_confirm_message<T: Signers + ?Sized>(
346        &self,
347        keypairs: &T,
348        message: Message,
349    ) -> TransportResult<Signature> {
350        let blockhash = self.get_latest_blockhash()?;
351        let mut transaction = Transaction::new(keypairs, message, blockhash);
352        let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
353        Ok(signature)
354    }
355
356    fn send_and_confirm_instruction(
357        &self,
358        keypair: &Keypair,
359        instruction: Instruction,
360    ) -> TransportResult<Signature> {
361        let message = Message::new(&[instruction], Some(&keypair.pubkey()));
362        self.send_and_confirm_message(&[keypair], message)
363    }
364
365    fn transfer_and_confirm(
366        &self,
367        lamports: u64,
368        keypair: &Keypair,
369        pubkey: &Pubkey,
370    ) -> TransportResult<Signature> {
371        let transfer_instruction =
372            system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
373        self.send_and_confirm_instruction(keypair, transfer_instruction)
374    }
375
376    fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
377        Ok(self.rpc_client().get_account_data(pubkey).ok())
378    }
379
380    fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
381        let account = self.rpc_client().get_account(pubkey);
382        match account {
383            Ok(value) => Ok(Some(value)),
384            Err(_) => Ok(None),
385        }
386    }
387
388    fn get_account_with_commitment(
389        &self,
390        pubkey: &Pubkey,
391        commitment_config: CommitmentConfig,
392    ) -> TransportResult<Option<Account>> {
393        self.rpc_client()
394            .get_account_with_commitment(pubkey, commitment_config)
395            .map_err(|e| e.into())
396            .map(|r| r.value)
397    }
398
399    fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
400        self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
401    }
402
403    fn get_balance_with_commitment(
404        &self,
405        pubkey: &Pubkey,
406        commitment_config: CommitmentConfig,
407    ) -> TransportResult<u64> {
408        self.rpc_client()
409            .get_balance_with_commitment(pubkey, commitment_config)
410            .map_err(|e| e.into())
411            .map(|r| r.value)
412    }
413
414    fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
415        self.rpc_client()
416            .get_minimum_balance_for_rent_exemption(data_len)
417            .map_err(|e| e.into())
418    }
419
420    fn get_signature_status(
421        &self,
422        signature: &Signature,
423    ) -> TransportResult<Option<transaction::Result<()>>> {
424        let status = self
425            .rpc_client()
426            .get_signature_status(signature)
427            .map_err(|err| {
428                io::Error::new(
429                    io::ErrorKind::Other,
430                    format!("send_transaction failed with error {err:?}"),
431                )
432            })?;
433        Ok(status)
434    }
435
436    fn get_signature_status_with_commitment(
437        &self,
438        signature: &Signature,
439        commitment_config: CommitmentConfig,
440    ) -> TransportResult<Option<transaction::Result<()>>> {
441        let status = self
442            .rpc_client()
443            .get_signature_status_with_commitment(signature, commitment_config)
444            .map_err(|err| {
445                io::Error::new(
446                    io::ErrorKind::Other,
447                    format!("send_transaction failed with error {err:?}"),
448                )
449            })?;
450        Ok(status)
451    }
452
453    fn get_slot(&self) -> TransportResult<u64> {
454        self.get_slot_with_commitment(CommitmentConfig::default())
455    }
456
457    fn get_slot_with_commitment(
458        &self,
459        commitment_config: CommitmentConfig,
460    ) -> TransportResult<u64> {
461        let slot = self
462            .rpc_client()
463            .get_slot_with_commitment(commitment_config)
464            .map_err(|err| {
465                io::Error::new(
466                    io::ErrorKind::Other,
467                    format!("send_transaction failed with error {err:?}"),
468                )
469            })?;
470        Ok(slot)
471    }
472
473    fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
474        self.rpc_client().get_epoch_info().map_err(|e| e.into())
475    }
476
477    fn get_transaction_count(&self) -> TransportResult<u64> {
478        let index = self.optimizer.experiment();
479        let now = Instant::now();
480        match self.rpc_client().get_transaction_count() {
481            Ok(transaction_count) => {
482                self.optimizer
483                    .report(index, now.elapsed().as_millis() as u64);
484                Ok(transaction_count)
485            }
486            Err(e) => {
487                self.optimizer.report(index, u64::MAX);
488                Err(e.into())
489            }
490        }
491    }
492
493    fn get_transaction_count_with_commitment(
494        &self,
495        commitment_config: CommitmentConfig,
496    ) -> TransportResult<u64> {
497        let index = self.optimizer.experiment();
498        let now = Instant::now();
499        match self
500            .rpc_client()
501            .get_transaction_count_with_commitment(commitment_config)
502        {
503            Ok(transaction_count) => {
504                self.optimizer
505                    .report(index, now.elapsed().as_millis() as u64);
506                Ok(transaction_count)
507            }
508            Err(e) => {
509                self.optimizer.report(index, u64::MAX);
510                Err(e.into())
511            }
512        }
513    }
514
515    /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
516    fn poll_for_signature_confirmation(
517        &self,
518        signature: &Signature,
519        min_confirmed_blocks: usize,
520    ) -> TransportResult<usize> {
521        self.rpc_client()
522            .poll_for_signature_confirmation(signature, min_confirmed_blocks)
523            .map_err(|e| e.into())
524    }
525
526    fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
527        self.rpc_client()
528            .poll_for_signature(signature)
529            .map_err(|e| e.into())
530    }
531
532    fn get_latest_blockhash(&self) -> TransportResult<Hash> {
533        let (blockhash, _) =
534            self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
535        Ok(blockhash)
536    }
537
538    fn get_latest_blockhash_with_commitment(
539        &self,
540        commitment_config: CommitmentConfig,
541    ) -> TransportResult<(Hash, u64)> {
542        let index = self.optimizer.experiment();
543        let now = Instant::now();
544        match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
545            Ok((blockhash, last_valid_block_height)) => {
546                self.optimizer
547                    .report(index, now.elapsed().as_millis() as u64);
548                Ok((blockhash, last_valid_block_height))
549            }
550            Err(e) => {
551                self.optimizer.report(index, u64::MAX);
552                Err(e.into())
553            }
554        }
555    }
556
557    fn is_blockhash_valid(
558        &self,
559        blockhash: &Hash,
560        commitment_config: CommitmentConfig,
561    ) -> TransportResult<bool> {
562        self.rpc_client()
563            .is_blockhash_valid(blockhash, commitment_config)
564            .map_err(|e| e.into())
565    }
566
567    fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
568        self.rpc_client()
569            .get_fee_for_message(message)
570            .map_err(|e| e.into())
571    }
572}
573
574#[allow(deprecated)]
575impl<P, M, C> AsyncClient for ThinClient<P, M, C>
576where
577    P: ConnectionPool<NewConnectionConfig = C>,
578    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
579    C: NewConnectionConfig,
580{
581    fn async_send_versioned_transaction(
582        &self,
583        transaction: VersionedTransaction,
584    ) -> TransportResult<Signature> {
585        let conn = self.connection_cache.get_connection(self.tpu_addr());
586        let wire_transaction =
587            bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
588        conn.send_data(&wire_transaction)?;
589        Ok(transaction.signatures[0])
590    }
591
592    fn async_send_versioned_transaction_batch(
593        &self,
594        batch: Vec<VersionedTransaction>,
595    ) -> TransportResult<()> {
596        let conn = self.connection_cache.get_connection(self.tpu_addr());
597        let buffers = batch
598            .into_par_iter()
599            .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
600            .collect::<Vec<_>>();
601        conn.send_data_batch(&buffers)?;
602        Ok(())
603    }
604}
605
606fn min_index(array: &[u64]) -> (u64, usize) {
607    let mut min_time = u64::MAX;
608    let mut min_index = 0;
609    for (i, time) in array.iter().enumerate() {
610        if *time < min_time {
611            min_time = *time;
612            min_index = i;
613        }
614    }
615    (min_time, min_index)
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    #[test]
623    fn test_client_optimizer() {
624        solana_logger::setup();
625
626        const NUM_CLIENTS: usize = 5;
627        let optimizer = ClientOptimizer::new(NUM_CLIENTS);
628        (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
629            let index = optimizer.experiment();
630            optimizer.report(index, (NUM_CLIENTS - index) as u64);
631        });
632
633        let index = optimizer.experiment();
634        optimizer.report(index, 50);
635        assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
636
637        optimizer.report(optimizer.best(), u64::MAX);
638        assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
639    }
640}