safecoin_client/
thin_client.rs

1//! The `thin_client` module is a client-side object that interfaces with
2//! a server-side TPU.  Client code should use this object instead of writing
3//! messages to the network directly. The binary encoding of its messages are
4//! unstable and may change in future releases.
5
6use {
7    crate::{
8        connection_cache::ConnectionCache, rpc_client::RpcClient,
9        rpc_config::RpcProgramAccountsConfig, rpc_response::Response,
10        tpu_connection::TpuConnection,
11    },
12    log::*,
13    solana_sdk::{
14        account::Account,
15        client::{AsyncClient, Client, SyncClient},
16        clock::{Slot, MAX_PROCESSING_AGE},
17        commitment_config::CommitmentConfig,
18        epoch_info::EpochInfo,
19        fee_calculator::{FeeCalculator, FeeRateGovernor},
20        hash::Hash,
21        instruction::Instruction,
22        message::Message,
23        pubkey::Pubkey,
24        signature::{Keypair, Signature, Signer},
25        signers::Signers,
26        system_instruction,
27        timing::duration_as_ms,
28        transaction::{self, Transaction, VersionedTransaction},
29        transport::Result as TransportResult,
30    },
31    std::{
32        io,
33        net::SocketAddr,
34        sync::{
35            atomic::{AtomicBool, AtomicUsize, Ordering},
36            Arc, RwLock,
37        },
38        time::{Duration, Instant},
39    },
40};
41
42struct ClientOptimizer {
43    cur_index: AtomicUsize,
44    experiment_index: AtomicUsize,
45    experiment_done: AtomicBool,
46    times: RwLock<Vec<u64>>,
47    num_clients: usize,
48}
49
50fn min_index(array: &[u64]) -> (u64, usize) {
51    let mut min_time = std::u64::MAX;
52    let mut min_index = 0;
53    for (i, time) in array.iter().enumerate() {
54        if *time < min_time {
55            min_time = *time;
56            min_index = i;
57        }
58    }
59    (min_time, min_index)
60}
61
62impl ClientOptimizer {
63    fn new(num_clients: usize) -> Self {
64        Self {
65            cur_index: AtomicUsize::new(0),
66            experiment_index: AtomicUsize::new(0),
67            experiment_done: AtomicBool::new(false),
68            times: RwLock::new(vec![std::u64::MAX; num_clients]),
69            num_clients,
70        }
71    }
72
73    fn experiment(&self) -> usize {
74        if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
75            let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
76            if old < self.num_clients {
77                old
78            } else {
79                self.best()
80            }
81        } else {
82            self.best()
83        }
84    }
85
86    fn report(&self, index: usize, time_ms: u64) {
87        if self.num_clients > 1
88            && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
89        {
90            trace!(
91                "report {} with {} exp: {}",
92                index,
93                time_ms,
94                self.experiment_index.load(Ordering::Relaxed)
95            );
96
97            self.times.write().unwrap()[index] = time_ms;
98
99            if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
100                let times = self.times.read().unwrap();
101                let (min_time, min_index) = min_index(&times);
102                trace!(
103                    "done experimenting min: {} time: {} times: {:?}",
104                    min_index,
105                    min_time,
106                    times
107                );
108
109                // Only 1 thread should grab the num_clients-1 index, so this should be ok.
110                self.cur_index.store(min_index, Ordering::Relaxed);
111                self.experiment_done.store(true, Ordering::Relaxed);
112            }
113        }
114    }
115
116    fn best(&self) -> usize {
117        self.cur_index.load(Ordering::Relaxed)
118    }
119}
120
121/// An object for querying and sending transactions to the network.
122pub struct ThinClient {
123    rpc_clients: Vec<RpcClient>,
124    tpu_addrs: Vec<SocketAddr>,
125    optimizer: ClientOptimizer,
126    connection_cache: Arc<ConnectionCache>,
127}
128
129impl ThinClient {
130    /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
131    /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
132    /// (currently hardcoded to UDP)
133    pub fn new(
134        rpc_addr: SocketAddr,
135        tpu_addr: SocketAddr,
136        connection_cache: Arc<ConnectionCache>,
137    ) -> Self {
138        Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
139    }
140
141    pub fn new_socket_with_timeout(
142        rpc_addr: SocketAddr,
143        tpu_addr: SocketAddr,
144        timeout: Duration,
145        connection_cache: Arc<ConnectionCache>,
146    ) -> Self {
147        let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
148        Self::new_from_client(rpc_client, tpu_addr, connection_cache)
149    }
150
151    fn new_from_client(
152        rpc_client: RpcClient,
153        tpu_addr: SocketAddr,
154        connection_cache: Arc<ConnectionCache>,
155    ) -> Self {
156        Self {
157            rpc_clients: vec![rpc_client],
158            tpu_addrs: vec![tpu_addr],
159            optimizer: ClientOptimizer::new(0),
160            connection_cache,
161        }
162    }
163
164    pub fn new_from_addrs(
165        rpc_addrs: Vec<SocketAddr>,
166        tpu_addrs: Vec<SocketAddr>,
167        connection_cache: Arc<ConnectionCache>,
168    ) -> Self {
169        assert!(!rpc_addrs.is_empty());
170        assert_eq!(rpc_addrs.len(), tpu_addrs.len());
171
172        let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
173        let optimizer = ClientOptimizer::new(rpc_clients.len());
174        Self {
175            rpc_clients,
176            tpu_addrs,
177            optimizer,
178            connection_cache,
179        }
180    }
181
182    fn tpu_addr(&self) -> &SocketAddr {
183        &self.tpu_addrs[self.optimizer.best()]
184    }
185
186    pub fn rpc_client(&self) -> &RpcClient {
187        &self.rpc_clients[self.optimizer.best()]
188    }
189
190    /// Retry a sending a signed Transaction to the server for processing.
191    pub fn retry_transfer_until_confirmed(
192        &self,
193        keypair: &Keypair,
194        transaction: &mut Transaction,
195        tries: usize,
196        min_confirmed_blocks: usize,
197    ) -> TransportResult<Signature> {
198        self.send_and_confirm_transaction(&[keypair], transaction, tries, min_confirmed_blocks)
199    }
200
201    /// Retry sending a signed Transaction with one signing Keypair to the server for processing.
202    pub fn retry_transfer(
203        &self,
204        keypair: &Keypair,
205        transaction: &mut Transaction,
206        tries: usize,
207    ) -> TransportResult<Signature> {
208        self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
209    }
210
211    pub fn send_and_confirm_transaction<T: Signers>(
212        &self,
213        keypairs: &T,
214        transaction: &mut Transaction,
215        tries: usize,
216        pending_confirmations: usize,
217    ) -> TransportResult<Signature> {
218        for x in 0..tries {
219            let now = Instant::now();
220            let mut num_confirmed = 0;
221            let mut wait_time = MAX_PROCESSING_AGE;
222            // resend the same transaction until the transaction has no chance of succeeding
223            let wire_transaction =
224                bincode::serialize(&transaction).expect("transaction serialization failed");
225            while now.elapsed().as_secs() < wait_time as u64 {
226                if num_confirmed == 0 {
227                    let conn = self.connection_cache.get_connection(self.tpu_addr());
228                    // Send the transaction if there has been no confirmation (e.g. the first time)
229                    conn.send_wire_transaction(&wire_transaction)?;
230                }
231
232                if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
233                    &transaction.signatures[0],
234                    pending_confirmations,
235                ) {
236                    num_confirmed = confirmed_blocks;
237                    if confirmed_blocks >= pending_confirmations {
238                        return Ok(transaction.signatures[0]);
239                    }
240                    // Since network has seen the transaction, wait longer to receive
241                    // all pending confirmations. Resending the transaction could result into
242                    // extra transaction fees
243                    wait_time = wait_time.max(
244                        MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
245                    );
246                }
247            }
248            info!("{} tries failed transfer to {}", x, self.tpu_addr());
249            let blockhash = self.get_latest_blockhash()?;
250            transaction.sign(keypairs, blockhash);
251        }
252        Err(io::Error::new(
253            io::ErrorKind::Other,
254            format!("retry_transfer failed in {} retries", tries),
255        )
256        .into())
257    }
258
259    pub fn poll_get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
260        self.poll_get_balance_with_commitment(pubkey, CommitmentConfig::default())
261    }
262
263    pub fn poll_get_balance_with_commitment(
264        &self,
265        pubkey: &Pubkey,
266        commitment_config: CommitmentConfig,
267    ) -> TransportResult<u64> {
268        self.rpc_client()
269            .poll_get_balance_with_commitment(pubkey, commitment_config)
270            .map_err(|e| e.into())
271    }
272
273    pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option<u64>) -> Option<u64> {
274        self.rpc_client().wait_for_balance_with_commitment(
275            pubkey,
276            expected_balance,
277            CommitmentConfig::default(),
278        )
279    }
280
281    pub fn get_program_accounts_with_config(
282        &self,
283        pubkey: &Pubkey,
284        config: RpcProgramAccountsConfig,
285    ) -> TransportResult<Vec<(Pubkey, Account)>> {
286        self.rpc_client()
287            .get_program_accounts_with_config(pubkey, config)
288            .map_err(|e| e.into())
289    }
290
291    pub fn wait_for_balance_with_commitment(
292        &self,
293        pubkey: &Pubkey,
294        expected_balance: Option<u64>,
295        commitment_config: CommitmentConfig,
296    ) -> Option<u64> {
297        self.rpc_client().wait_for_balance_with_commitment(
298            pubkey,
299            expected_balance,
300            commitment_config,
301        )
302    }
303
304    pub fn poll_for_signature_with_commitment(
305        &self,
306        signature: &Signature,
307        commitment_config: CommitmentConfig,
308    ) -> TransportResult<()> {
309        self.rpc_client()
310            .poll_for_signature_with_commitment(signature, commitment_config)
311            .map_err(|e| e.into())
312    }
313
314    pub fn get_num_blocks_since_signature_confirmation(
315        &mut self,
316        sig: &Signature,
317    ) -> TransportResult<usize> {
318        self.rpc_client()
319            .get_num_blocks_since_signature_confirmation(sig)
320            .map_err(|e| e.into())
321    }
322}
323
324impl Client for ThinClient {
325    fn tpu_addr(&self) -> String {
326        self.tpu_addr().to_string()
327    }
328}
329
330impl SyncClient for ThinClient {
331    fn send_and_confirm_message<T: Signers>(
332        &self,
333        keypairs: &T,
334        message: Message,
335    ) -> TransportResult<Signature> {
336        let blockhash = self.get_latest_blockhash()?;
337        let mut transaction = Transaction::new(keypairs, message, blockhash);
338        let signature = self.send_and_confirm_transaction(keypairs, &mut transaction, 5, 0)?;
339        Ok(signature)
340    }
341
342    fn send_and_confirm_instruction(
343        &self,
344        keypair: &Keypair,
345        instruction: Instruction,
346    ) -> TransportResult<Signature> {
347        let message = Message::new(&[instruction], Some(&keypair.pubkey()));
348        self.send_and_confirm_message(&[keypair], message)
349    }
350
351    fn transfer_and_confirm(
352        &self,
353        lamports: u64,
354        keypair: &Keypair,
355        pubkey: &Pubkey,
356    ) -> TransportResult<Signature> {
357        let transfer_instruction =
358            system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
359        self.send_and_confirm_instruction(keypair, transfer_instruction)
360    }
361
362    fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult<Option<Vec<u8>>> {
363        Ok(self.rpc_client().get_account_data(pubkey).ok())
364    }
365
366    fn get_account(&self, pubkey: &Pubkey) -> TransportResult<Option<Account>> {
367        let account = self.rpc_client().get_account(pubkey);
368        match account {
369            Ok(value) => Ok(Some(value)),
370            Err(_) => Ok(None),
371        }
372    }
373
374    fn get_account_with_commitment(
375        &self,
376        pubkey: &Pubkey,
377        commitment_config: CommitmentConfig,
378    ) -> TransportResult<Option<Account>> {
379        self.rpc_client()
380            .get_account_with_commitment(pubkey, commitment_config)
381            .map_err(|e| e.into())
382            .map(|r| r.value)
383    }
384
385    fn get_balance(&self, pubkey: &Pubkey) -> TransportResult<u64> {
386        self.rpc_client().get_balance(pubkey).map_err(|e| e.into())
387    }
388
389    fn get_balance_with_commitment(
390        &self,
391        pubkey: &Pubkey,
392        commitment_config: CommitmentConfig,
393    ) -> TransportResult<u64> {
394        self.rpc_client()
395            .get_balance_with_commitment(pubkey, commitment_config)
396            .map_err(|e| e.into())
397            .map(|r| r.value)
398    }
399
400    fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> TransportResult<u64> {
401        self.rpc_client()
402            .get_minimum_balance_for_rent_exemption(data_len)
403            .map_err(|e| e.into())
404    }
405
406    fn get_recent_blockhash(&self) -> TransportResult<(Hash, FeeCalculator)> {
407        #[allow(deprecated)]
408        let (blockhash, fee_calculator, _last_valid_slot) =
409            self.get_recent_blockhash_with_commitment(CommitmentConfig::default())?;
410        Ok((blockhash, fee_calculator))
411    }
412
413    fn get_recent_blockhash_with_commitment(
414        &self,
415        commitment_config: CommitmentConfig,
416    ) -> TransportResult<(Hash, FeeCalculator, Slot)> {
417        let index = self.optimizer.experiment();
418        let now = Instant::now();
419        #[allow(deprecated)]
420        let recent_blockhash =
421            self.rpc_clients[index].get_recent_blockhash_with_commitment(commitment_config);
422        match recent_blockhash {
423            Ok(Response { value, .. }) => {
424                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
425                Ok((value.0, value.1, value.2))
426            }
427            Err(e) => {
428                self.optimizer.report(index, std::u64::MAX);
429                Err(e.into())
430            }
431        }
432    }
433
434    fn get_fee_calculator_for_blockhash(
435        &self,
436        blockhash: &Hash,
437    ) -> TransportResult<Option<FeeCalculator>> {
438        #[allow(deprecated)]
439        self.rpc_client()
440            .get_fee_calculator_for_blockhash(blockhash)
441            .map_err(|e| e.into())
442    }
443
444    fn get_fee_rate_governor(&self) -> TransportResult<FeeRateGovernor> {
445        #[allow(deprecated)]
446        self.rpc_client()
447            .get_fee_rate_governor()
448            .map_err(|e| e.into())
449            .map(|r| r.value)
450    }
451
452    fn get_signature_status(
453        &self,
454        signature: &Signature,
455    ) -> TransportResult<Option<transaction::Result<()>>> {
456        let status = self
457            .rpc_client()
458            .get_signature_status(signature)
459            .map_err(|err| {
460                io::Error::new(
461                    io::ErrorKind::Other,
462                    format!("send_transaction failed with error {:?}", err),
463                )
464            })?;
465        Ok(status)
466    }
467
468    fn get_signature_status_with_commitment(
469        &self,
470        signature: &Signature,
471        commitment_config: CommitmentConfig,
472    ) -> TransportResult<Option<transaction::Result<()>>> {
473        let status = self
474            .rpc_client()
475            .get_signature_status_with_commitment(signature, commitment_config)
476            .map_err(|err| {
477                io::Error::new(
478                    io::ErrorKind::Other,
479                    format!("send_transaction failed with error {:?}", err),
480                )
481            })?;
482        Ok(status)
483    }
484
485    fn get_slot(&self) -> TransportResult<u64> {
486        self.get_slot_with_commitment(CommitmentConfig::default())
487    }
488
489    fn get_slot_with_commitment(
490        &self,
491        commitment_config: CommitmentConfig,
492    ) -> TransportResult<u64> {
493        let slot = self
494            .rpc_client()
495            .get_slot_with_commitment(commitment_config)
496            .map_err(|err| {
497                io::Error::new(
498                    io::ErrorKind::Other,
499                    format!("send_transaction failed with error {:?}", err),
500                )
501            })?;
502        Ok(slot)
503    }
504
505    fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
506        self.rpc_client().get_epoch_info().map_err(|e| e.into())
507    }
508
509    fn get_transaction_count(&self) -> TransportResult<u64> {
510        let index = self.optimizer.experiment();
511        let now = Instant::now();
512        match self.rpc_client().get_transaction_count() {
513            Ok(transaction_count) => {
514                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
515                Ok(transaction_count)
516            }
517            Err(e) => {
518                self.optimizer.report(index, std::u64::MAX);
519                Err(e.into())
520            }
521        }
522    }
523
524    fn get_transaction_count_with_commitment(
525        &self,
526        commitment_config: CommitmentConfig,
527    ) -> TransportResult<u64> {
528        let index = self.optimizer.experiment();
529        let now = Instant::now();
530        match self
531            .rpc_client()
532            .get_transaction_count_with_commitment(commitment_config)
533        {
534            Ok(transaction_count) => {
535                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
536                Ok(transaction_count)
537            }
538            Err(e) => {
539                self.optimizer.report(index, std::u64::MAX);
540                Err(e.into())
541            }
542        }
543    }
544
545    /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
546    fn poll_for_signature_confirmation(
547        &self,
548        signature: &Signature,
549        min_confirmed_blocks: usize,
550    ) -> TransportResult<usize> {
551        self.rpc_client()
552            .poll_for_signature_confirmation(signature, min_confirmed_blocks)
553            .map_err(|e| e.into())
554    }
555
556    fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> {
557        self.rpc_client()
558            .poll_for_signature(signature)
559            .map_err(|e| e.into())
560    }
561
562    fn get_new_blockhash(&self, blockhash: &Hash) -> TransportResult<(Hash, FeeCalculator)> {
563        #[allow(deprecated)]
564        self.rpc_client()
565            .get_new_blockhash(blockhash)
566            .map_err(|e| e.into())
567    }
568
569    fn get_latest_blockhash(&self) -> TransportResult<Hash> {
570        let (blockhash, _) =
571            self.get_latest_blockhash_with_commitment(CommitmentConfig::default())?;
572        Ok(blockhash)
573    }
574
575    fn get_latest_blockhash_with_commitment(
576        &self,
577        commitment_config: CommitmentConfig,
578    ) -> TransportResult<(Hash, u64)> {
579        let index = self.optimizer.experiment();
580        let now = Instant::now();
581        match self.rpc_clients[index].get_latest_blockhash_with_commitment(commitment_config) {
582            Ok((blockhash, last_valid_block_height)) => {
583                self.optimizer.report(index, duration_as_ms(&now.elapsed()));
584                Ok((blockhash, last_valid_block_height))
585            }
586            Err(e) => {
587                self.optimizer.report(index, std::u64::MAX);
588                Err(e.into())
589            }
590        }
591    }
592
593    fn is_blockhash_valid(
594        &self,
595        blockhash: &Hash,
596        commitment_config: CommitmentConfig,
597    ) -> TransportResult<bool> {
598        self.rpc_client()
599            .is_blockhash_valid(blockhash, commitment_config)
600            .map_err(|e| e.into())
601    }
602
603    fn get_fee_for_message(&self, message: &Message) -> TransportResult<u64> {
604        self.rpc_client()
605            .get_fee_for_message(message)
606            .map_err(|e| e.into())
607    }
608}
609
610impl AsyncClient for ThinClient {
611    fn async_send_versioned_transaction(
612        &self,
613        transaction: VersionedTransaction,
614    ) -> TransportResult<Signature> {
615        let conn = self.connection_cache.get_connection(self.tpu_addr());
616        conn.serialize_and_send_transaction(&transaction)?;
617        Ok(transaction.signatures[0])
618    }
619
620    fn async_send_versioned_transaction_batch(
621        &self,
622        batch: Vec<VersionedTransaction>,
623    ) -> TransportResult<()> {
624        let conn = self.connection_cache.get_connection(self.tpu_addr());
625        conn.par_serialize_and_send_transaction_batch(&batch[..])?;
626        Ok(())
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use {super::*, rayon::prelude::*};
633
634    #[test]
635    fn test_client_optimizer() {
636        solana_logger::setup();
637
638        const NUM_CLIENTS: usize = 5;
639        let optimizer = ClientOptimizer::new(NUM_CLIENTS);
640        (0..NUM_CLIENTS).into_par_iter().for_each(|_| {
641            let index = optimizer.experiment();
642            optimizer.report(index, (NUM_CLIENTS - index) as u64);
643        });
644
645        let index = optimizer.experiment();
646        optimizer.report(index, 50);
647        assert_eq!(optimizer.best(), NUM_CLIENTS - 1);
648
649        optimizer.report(optimizer.best(), std::u64::MAX);
650        assert_eq!(optimizer.best(), NUM_CLIENTS - 2);
651    }
652}