solana_client/
transaction_executor.rs

1#![allow(clippy::arithmetic_side_effects)]
2use {
3    log::*,
4    solana_commitment_config::CommitmentConfig,
5    solana_measure::measure::Measure,
6    solana_rpc_client::rpc_client::RpcClient,
7    solana_signature::Signature,
8    solana_time_utils::timestamp,
9    solana_transaction::Transaction,
10    std::{
11        net::SocketAddr,
12        sync::{
13            atomic::{AtomicBool, AtomicU64, Ordering},
14            Arc, RwLock,
15        },
16        thread::{sleep, Builder, JoinHandle},
17        time::{Duration, Instant},
18    },
19};
20
21// signature, timestamp, id
22type PendingQueue = Vec<(Signature, u64, u64)>;
23
24pub struct TransactionExecutor {
25    sig_clear_t: JoinHandle<()>,
26    sigs: Arc<RwLock<PendingQueue>>,
27    cleared: Arc<RwLock<Vec<u64>>>,
28    exit: Arc<AtomicBool>,
29    counter: AtomicU64,
30    client: Arc<RpcClient>,
31}
32
33impl TransactionExecutor {
34    pub fn new(entrypoint_addr: SocketAddr) -> Self {
35        let client = Arc::new(RpcClient::new_socket_with_commitment(
36            entrypoint_addr,
37            CommitmentConfig::confirmed(),
38        ));
39        Self::new_with_rpc_client(client)
40    }
41
42    pub fn new_with_url<U: ToString>(url: U) -> Self {
43        let client = Arc::new(RpcClient::new_with_commitment(
44            url,
45            CommitmentConfig::confirmed(),
46        ));
47        Self::new_with_rpc_client(client)
48    }
49
50    pub fn new_with_rpc_client(client: Arc<RpcClient>) -> Self {
51        let sigs = Arc::new(RwLock::new(Vec::new()));
52        let cleared = Arc::new(RwLock::new(Vec::new()));
53        let exit = Arc::new(AtomicBool::new(false));
54        let sig_clear_t = Self::start_sig_clear_thread(exit.clone(), &sigs, &cleared, &client);
55        Self {
56            sigs,
57            cleared,
58            sig_clear_t,
59            exit,
60            counter: AtomicU64::new(0),
61            client,
62        }
63    }
64
65    pub fn num_outstanding(&self) -> usize {
66        self.sigs.read().unwrap().len()
67    }
68
69    pub fn push_transactions(&self, txs: Vec<Transaction>) -> Vec<u64> {
70        let mut ids = vec![];
71        let new_sigs = txs.into_iter().filter_map(|tx| {
72            let id = self.counter.fetch_add(1, Ordering::Relaxed);
73            ids.push(id);
74            match self.client.send_transaction(&tx) {
75                Ok(sig) => {
76                    return Some((sig, timestamp(), id));
77                }
78                Err(e) => {
79                    info!("error: {:#?}", e);
80                }
81            }
82            None
83        });
84        let mut sigs_w = self.sigs.write().unwrap();
85        sigs_w.extend(new_sigs);
86        ids
87    }
88
89    pub fn drain_cleared(&self) -> Vec<u64> {
90        std::mem::take(&mut *self.cleared.write().unwrap())
91    }
92
93    pub fn close(self) {
94        self.exit.store(true, Ordering::Relaxed);
95        self.sig_clear_t.join().unwrap();
96    }
97
98    fn start_sig_clear_thread(
99        exit: Arc<AtomicBool>,
100        sigs: &Arc<RwLock<PendingQueue>>,
101        cleared: &Arc<RwLock<Vec<u64>>>,
102        client: &Arc<RpcClient>,
103    ) -> JoinHandle<()> {
104        let sigs = sigs.clone();
105        let cleared = cleared.clone();
106        let client = client.clone();
107        Builder::new()
108            .name("solSigClear".to_string())
109            .spawn(move || {
110                let mut success = 0;
111                let mut error_count = 0;
112                let mut timed_out = 0;
113                let mut last_log = Instant::now();
114                while !exit.load(Ordering::Relaxed) {
115                    let sigs_len = sigs.read().unwrap().len();
116                    if sigs_len > 0 {
117                        let mut sigs_w = sigs.write().unwrap();
118                        let mut start = Measure::start("sig_status");
119                        let statuses: Vec<_> = sigs_w
120                            .chunks(200)
121                            .flat_map(|sig_chunk| {
122                                let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect();
123                                client
124                                    .get_signature_statuses(&only_sigs)
125                                    .expect("status fail")
126                                    .value
127                            })
128                            .collect();
129                        let mut num_cleared = 0;
130                        let start_len = sigs_w.len();
131                        let now = timestamp();
132                        let mut new_ids = vec![];
133                        let mut i = 0;
134                        let mut j = 0;
135                        while i != sigs_w.len() {
136                            let mut retain = true;
137                            let sent_ts = sigs_w[i].1;
138                            if let Some(e) = &statuses[j] {
139                                debug!("error: {:?}", e);
140                                if e.status.is_ok() {
141                                    success += 1;
142                                } else {
143                                    error_count += 1;
144                                }
145                                num_cleared += 1;
146                                retain = false;
147                            } else if now - sent_ts > 30_000 {
148                                retain = false;
149                                timed_out += 1;
150                            }
151                            if !retain {
152                                new_ids.push(sigs_w.remove(i).2);
153                            } else {
154                                i += 1;
155                            }
156                            j += 1;
157                        }
158                        let final_sigs_len = sigs_w.len();
159                        drop(sigs_w);
160                        cleared.write().unwrap().extend(new_ids);
161                        start.stop();
162                        debug!(
163                            "sigs len: {:?} success: {} took: {}ms cleared: {}/{}",
164                            final_sigs_len,
165                            success,
166                            start.as_ms(),
167                            num_cleared,
168                            start_len,
169                        );
170                        if last_log.elapsed().as_millis() > 5000 {
171                            info!(
172                                "success: {} error: {} timed_out: {}",
173                                success, error_count, timed_out,
174                            );
175                            last_log = Instant::now();
176                        }
177                    }
178                    sleep(Duration::from_millis(200));
179                }
180            })
181            .unwrap()
182    }
183}