safecoin_client/
transaction_executor.rs1#![allow(clippy::integer_arithmetic)]
2use {
3 crate::rpc_client::RpcClient,
4 log::*,
5 safecoin_measure::measure::Measure,
6 solana_sdk::{
7 commitment_config::CommitmentConfig, signature::Signature, timing::timestamp,
8 transaction::Transaction,
9 },
10 std::{
11 net::SocketAddr,
12 sync::{
13 atomic::{AtomicBool, AtomicU64, Ordering},
14 Arc, RwLock,
15 },
16 thread::{sleep, Builder, JoinHandle},
17 time::{Duration, Instant},
18 },
19};
20
21type PendingQueue = Vec<(Signature, u64, u64)>;
23
24pub struct TransactionExecutor {
25 sig_clear_t: JoinHandle<()>,
26 sigs: Arc<RwLock<PendingQueue>>,
27 cleared: Arc<RwLock<Vec<u64>>>,
28 exit: Arc<AtomicBool>,
29 counter: AtomicU64,
30 client: RpcClient,
31}
32
33impl TransactionExecutor {
34 pub fn new(entrypoint_addr: SocketAddr) -> Self {
35 let sigs = Arc::new(RwLock::new(Vec::new()));
36 let cleared = Arc::new(RwLock::new(Vec::new()));
37 let exit = Arc::new(AtomicBool::new(false));
38 let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr);
39 let client =
40 RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed());
41 Self {
42 sigs,
43 cleared,
44 sig_clear_t,
45 exit,
46 counter: AtomicU64::new(0),
47 client,
48 }
49 }
50
51 pub fn num_outstanding(&self) -> usize {
52 self.sigs.read().unwrap().len()
53 }
54
55 pub fn push_transactions(&self, txs: Vec<Transaction>) -> Vec<u64> {
56 let mut ids = vec![];
57 let new_sigs = txs.into_iter().filter_map(|tx| {
58 let id = self.counter.fetch_add(1, Ordering::Relaxed);
59 ids.push(id);
60 match self.client.send_transaction(&tx) {
61 Ok(sig) => {
62 return Some((sig, timestamp(), id));
63 }
64 Err(e) => {
65 info!("error: {:#?}", e);
66 }
67 }
68 None
69 });
70 let mut sigs_w = self.sigs.write().unwrap();
71 sigs_w.extend(new_sigs);
72 ids
73 }
74
75 pub fn drain_cleared(&self) -> Vec<u64> {
76 std::mem::take(&mut *self.cleared.write().unwrap())
77 }
78
79 pub fn close(self) {
80 self.exit.store(true, Ordering::Relaxed);
81 self.sig_clear_t.join().unwrap();
82 }
83
84 fn start_sig_clear_thread(
85 exit: &Arc<AtomicBool>,
86 sigs: &Arc<RwLock<PendingQueue>>,
87 cleared: &Arc<RwLock<Vec<u64>>>,
88 entrypoint_addr: SocketAddr,
89 ) -> JoinHandle<()> {
90 let sigs = sigs.clone();
91 let exit = exit.clone();
92 let cleared = cleared.clone();
93 Builder::new()
94 .name("solSigClear".to_string())
95 .spawn(move || {
96 let client = RpcClient::new_socket_with_commitment(
97 entrypoint_addr,
98 CommitmentConfig::confirmed(),
99 );
100 let mut success = 0;
101 let mut error_count = 0;
102 let mut timed_out = 0;
103 let mut last_log = Instant::now();
104 while !exit.load(Ordering::Relaxed) {
105 let sigs_len = sigs.read().unwrap().len();
106 if sigs_len > 0 {
107 let mut sigs_w = sigs.write().unwrap();
108 let mut start = Measure::start("sig_status");
109 let statuses: Vec<_> = sigs_w
110 .chunks(200)
111 .flat_map(|sig_chunk| {
112 let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect();
113 client
114 .get_signature_statuses(&only_sigs)
115 .expect("status fail")
116 .value
117 })
118 .collect();
119 let mut num_cleared = 0;
120 let start_len = sigs_w.len();
121 let now = timestamp();
122 let mut new_ids = vec![];
123 let mut i = 0;
124 let mut j = 0;
125 while i != sigs_w.len() {
126 let mut retain = true;
127 let sent_ts = sigs_w[i].1;
128 if let Some(e) = &statuses[j] {
129 debug!("error: {:?}", e);
130 if e.status.is_ok() {
131 success += 1;
132 } else {
133 error_count += 1;
134 }
135 num_cleared += 1;
136 retain = false;
137 } else if now - sent_ts > 30_000 {
138 retain = false;
139 timed_out += 1;
140 }
141 if !retain {
142 new_ids.push(sigs_w.remove(i).2);
143 } else {
144 i += 1;
145 }
146 j += 1;
147 }
148 let final_sigs_len = sigs_w.len();
149 drop(sigs_w);
150 cleared.write().unwrap().extend(new_ids);
151 start.stop();
152 debug!(
153 "sigs len: {:?} success: {} took: {}ms cleared: {}/{}",
154 final_sigs_len,
155 success,
156 start.as_ms(),
157 num_cleared,
158 start_len,
159 );
160 if last_log.elapsed().as_millis() > 5000 {
161 info!(
162 "success: {} error: {} timed_out: {}",
163 success, error_count, timed_out,
164 );
165 last_log = Instant::now();
166 }
167 }
168 sleep(Duration::from_millis(200));
169 }
170 })
171 .unwrap()
172 }
173}