safecoin_client/
transaction_executor.rs

1#![allow(clippy::integer_arithmetic)]
2use {
3    crate::rpc_client::RpcClient,
4    log::*,
5    safecoin_measure::measure::Measure,
6    solana_sdk::{
7        commitment_config::CommitmentConfig, signature::Signature, timing::timestamp,
8        transaction::Transaction,
9    },
10    std::{
11        net::SocketAddr,
12        sync::{
13            atomic::{AtomicBool, AtomicU64, Ordering},
14            Arc, RwLock,
15        },
16        thread::{sleep, Builder, JoinHandle},
17        time::{Duration, Instant},
18    },
19};
20
21// signature, timestamp, id
22type PendingQueue = Vec<(Signature, u64, u64)>;
23
24pub struct TransactionExecutor {
25    sig_clear_t: JoinHandle<()>,
26    sigs: Arc<RwLock<PendingQueue>>,
27    cleared: Arc<RwLock<Vec<u64>>>,
28    exit: Arc<AtomicBool>,
29    counter: AtomicU64,
30    client: RpcClient,
31}
32
33impl TransactionExecutor {
34    pub fn new(entrypoint_addr: SocketAddr) -> Self {
35        let sigs = Arc::new(RwLock::new(Vec::new()));
36        let cleared = Arc::new(RwLock::new(Vec::new()));
37        let exit = Arc::new(AtomicBool::new(false));
38        let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr);
39        let client =
40            RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed());
41        Self {
42            sigs,
43            cleared,
44            sig_clear_t,
45            exit,
46            counter: AtomicU64::new(0),
47            client,
48        }
49    }
50
51    pub fn num_outstanding(&self) -> usize {
52        self.sigs.read().unwrap().len()
53    }
54
55    pub fn push_transactions(&self, txs: Vec<Transaction>) -> Vec<u64> {
56        let mut ids = vec![];
57        let new_sigs = txs.into_iter().filter_map(|tx| {
58            let id = self.counter.fetch_add(1, Ordering::Relaxed);
59            ids.push(id);
60            match self.client.send_transaction(&tx) {
61                Ok(sig) => {
62                    return Some((sig, timestamp(), id));
63                }
64                Err(e) => {
65                    info!("error: {:#?}", e);
66                }
67            }
68            None
69        });
70        let mut sigs_w = self.sigs.write().unwrap();
71        sigs_w.extend(new_sigs);
72        ids
73    }
74
75    pub fn drain_cleared(&self) -> Vec<u64> {
76        std::mem::take(&mut *self.cleared.write().unwrap())
77    }
78
79    pub fn close(self) {
80        self.exit.store(true, Ordering::Relaxed);
81        self.sig_clear_t.join().unwrap();
82    }
83
84    fn start_sig_clear_thread(
85        exit: &Arc<AtomicBool>,
86        sigs: &Arc<RwLock<PendingQueue>>,
87        cleared: &Arc<RwLock<Vec<u64>>>,
88        entrypoint_addr: SocketAddr,
89    ) -> JoinHandle<()> {
90        let sigs = sigs.clone();
91        let exit = exit.clone();
92        let cleared = cleared.clone();
93        Builder::new()
94            .name("solSigClear".to_string())
95            .spawn(move || {
96                let client = RpcClient::new_socket_with_commitment(
97                    entrypoint_addr,
98                    CommitmentConfig::confirmed(),
99                );
100                let mut success = 0;
101                let mut error_count = 0;
102                let mut timed_out = 0;
103                let mut last_log = Instant::now();
104                while !exit.load(Ordering::Relaxed) {
105                    let sigs_len = sigs.read().unwrap().len();
106                    if sigs_len > 0 {
107                        let mut sigs_w = sigs.write().unwrap();
108                        let mut start = Measure::start("sig_status");
109                        let statuses: Vec<_> = sigs_w
110                            .chunks(200)
111                            .flat_map(|sig_chunk| {
112                                let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect();
113                                client
114                                    .get_signature_statuses(&only_sigs)
115                                    .expect("status fail")
116                                    .value
117                            })
118                            .collect();
119                        let mut num_cleared = 0;
120                        let start_len = sigs_w.len();
121                        let now = timestamp();
122                        let mut new_ids = vec![];
123                        let mut i = 0;
124                        let mut j = 0;
125                        while i != sigs_w.len() {
126                            let mut retain = true;
127                            let sent_ts = sigs_w[i].1;
128                            if let Some(e) = &statuses[j] {
129                                debug!("error: {:?}", e);
130                                if e.status.is_ok() {
131                                    success += 1;
132                                } else {
133                                    error_count += 1;
134                                }
135                                num_cleared += 1;
136                                retain = false;
137                            } else if now - sent_ts > 30_000 {
138                                retain = false;
139                                timed_out += 1;
140                            }
141                            if !retain {
142                                new_ids.push(sigs_w.remove(i).2);
143                            } else {
144                                i += 1;
145                            }
146                            j += 1;
147                        }
148                        let final_sigs_len = sigs_w.len();
149                        drop(sigs_w);
150                        cleared.write().unwrap().extend(new_ids);
151                        start.stop();
152                        debug!(
153                            "sigs len: {:?} success: {} took: {}ms cleared: {}/{}",
154                            final_sigs_len,
155                            success,
156                            start.as_ms(),
157                            num_cleared,
158                            start_len,
159                        );
160                        if last_log.elapsed().as_millis() > 5000 {
161                            info!(
162                                "success: {} error: {} timed_out: {}",
163                                success, error_count, timed_out,
164                            );
165                            last_log = Instant::now();
166                        }
167                    }
168                    sleep(Duration::from_millis(200));
169                }
170            })
171            .unwrap()
172    }
173}