solana_client/
transaction_executor.rs1#![allow(clippy::arithmetic_side_effects)]
2use {
3 log::*,
4 solana_commitment_config::CommitmentConfig,
5 solana_measure::measure::Measure,
6 solana_rpc_client::rpc_client::RpcClient,
7 solana_signature::Signature,
8 solana_time_utils::timestamp,
9 solana_transaction::Transaction,
10 std::{
11 net::SocketAddr,
12 sync::{
13 atomic::{AtomicBool, AtomicU64, Ordering},
14 Arc, RwLock,
15 },
16 thread::{sleep, Builder, JoinHandle},
17 time::{Duration, Instant},
18 },
19};
20
21type PendingQueue = Vec<(Signature, u64, u64)>;
23
24pub struct TransactionExecutor {
25 sig_clear_t: JoinHandle<()>,
26 sigs: Arc<RwLock<PendingQueue>>,
27 cleared: Arc<RwLock<Vec<u64>>>,
28 exit: Arc<AtomicBool>,
29 counter: AtomicU64,
30 client: Arc<RpcClient>,
31}
32
33impl TransactionExecutor {
34 pub fn new(entrypoint_addr: SocketAddr) -> Self {
35 let client = Arc::new(RpcClient::new_socket_with_commitment(
36 entrypoint_addr,
37 CommitmentConfig::confirmed(),
38 ));
39 Self::new_with_rpc_client(client)
40 }
41
42 pub fn new_with_url<U: ToString>(url: U) -> Self {
43 let client = Arc::new(RpcClient::new_with_commitment(
44 url,
45 CommitmentConfig::confirmed(),
46 ));
47 Self::new_with_rpc_client(client)
48 }
49
50 pub fn new_with_rpc_client(client: Arc<RpcClient>) -> Self {
51 let sigs = Arc::new(RwLock::new(Vec::new()));
52 let cleared = Arc::new(RwLock::new(Vec::new()));
53 let exit = Arc::new(AtomicBool::new(false));
54 let sig_clear_t = Self::start_sig_clear_thread(exit.clone(), &sigs, &cleared, &client);
55 Self {
56 sigs,
57 cleared,
58 sig_clear_t,
59 exit,
60 counter: AtomicU64::new(0),
61 client,
62 }
63 }
64
65 pub fn num_outstanding(&self) -> usize {
66 self.sigs.read().unwrap().len()
67 }
68
69 pub fn push_transactions(&self, txs: Vec<Transaction>) -> Vec<u64> {
70 let mut ids = vec![];
71 let new_sigs = txs.into_iter().filter_map(|tx| {
72 let id = self.counter.fetch_add(1, Ordering::Relaxed);
73 ids.push(id);
74 match self.client.send_transaction(&tx) {
75 Ok(sig) => {
76 return Some((sig, timestamp(), id));
77 }
78 Err(e) => {
79 info!("error: {:#?}", e);
80 }
81 }
82 None
83 });
84 let mut sigs_w = self.sigs.write().unwrap();
85 sigs_w.extend(new_sigs);
86 ids
87 }
88
89 pub fn drain_cleared(&self) -> Vec<u64> {
90 std::mem::take(&mut *self.cleared.write().unwrap())
91 }
92
93 pub fn close(self) {
94 self.exit.store(true, Ordering::Relaxed);
95 self.sig_clear_t.join().unwrap();
96 }
97
98 fn start_sig_clear_thread(
99 exit: Arc<AtomicBool>,
100 sigs: &Arc<RwLock<PendingQueue>>,
101 cleared: &Arc<RwLock<Vec<u64>>>,
102 client: &Arc<RpcClient>,
103 ) -> JoinHandle<()> {
104 let sigs = sigs.clone();
105 let cleared = cleared.clone();
106 let client = client.clone();
107 Builder::new()
108 .name("solSigClear".to_string())
109 .spawn(move || {
110 let mut success = 0;
111 let mut error_count = 0;
112 let mut timed_out = 0;
113 let mut last_log = Instant::now();
114 while !exit.load(Ordering::Relaxed) {
115 let sigs_len = sigs.read().unwrap().len();
116 if sigs_len > 0 {
117 let mut sigs_w = sigs.write().unwrap();
118 let mut start = Measure::start("sig_status");
119 let statuses: Vec<_> = sigs_w
120 .chunks(200)
121 .flat_map(|sig_chunk| {
122 let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect();
123 client
124 .get_signature_statuses(&only_sigs)
125 .expect("status fail")
126 .value
127 })
128 .collect();
129 let mut num_cleared = 0;
130 let start_len = sigs_w.len();
131 let now = timestamp();
132 let mut new_ids = vec![];
133 let mut i = 0;
134 let mut j = 0;
135 while i != sigs_w.len() {
136 let mut retain = true;
137 let sent_ts = sigs_w[i].1;
138 if let Some(e) = &statuses[j] {
139 debug!("error: {:?}", e);
140 if e.status.is_ok() {
141 success += 1;
142 } else {
143 error_count += 1;
144 }
145 num_cleared += 1;
146 retain = false;
147 } else if now - sent_ts > 30_000 {
148 retain = false;
149 timed_out += 1;
150 }
151 if !retain {
152 new_ids.push(sigs_w.remove(i).2);
153 } else {
154 i += 1;
155 }
156 j += 1;
157 }
158 let final_sigs_len = sigs_w.len();
159 drop(sigs_w);
160 cleared.write().unwrap().extend(new_ids);
161 start.stop();
162 debug!(
163 "sigs len: {:?} success: {} took: {}ms cleared: {}/{}",
164 final_sigs_len,
165 success,
166 start.as_ms(),
167 num_cleared,
168 start_len,
169 );
170 if last_log.elapsed().as_millis() > 5000 {
171 info!(
172 "success: {} error: {} timed_out: {}",
173 success, error_count, timed_out,
174 );
175 last_log = Instant::now();
176 }
177 }
178 sleep(Duration::from_millis(200));
179 }
180 })
181 .unwrap()
182 }
183}