1use {
2 crate::{
3 nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
4 rpc_client::RpcClient as BlockingRpcClient,
5 },
6 bincode::serialize,
7 dashmap::DashMap,
8 futures_util::future::join_all,
9 solana_hash::Hash,
10 solana_message::Message,
11 solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
12 solana_rpc_client::spinner::{self, SendTransactionProgress},
13 solana_rpc_client_api::{
14 client_error::ErrorKind,
15 config::RpcSendTransactionConfig,
16 request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
17 response::RpcSimulateTransactionResult,
18 },
19 solana_signature::Signature,
20 solana_signer::{signers::Signers, SignerError},
21 solana_tpu_client::tpu_client::{Result, TpuSenderError},
22 solana_transaction::Transaction,
23 solana_transaction_error::TransactionError,
24 std::{
25 sync::{
26 atomic::{AtomicU64, AtomicUsize, Ordering},
27 Arc,
28 },
29 time::Duration,
30 },
31 tokio::{sync::RwLock, task::JoinHandle},
32};
33
34const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
35const SEND_INTERVAL: Duration = Duration::from_millis(10);
36const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
40
41type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
42
43#[derive(Clone, Debug)]
44struct TransactionData {
45 last_valid_block_height: u64,
46 message: Message,
47 index: usize,
48 serialized_transaction: Vec<u8>,
49}
50
51#[derive(Clone, Debug, Copy)]
52struct BlockHashData {
53 pub blockhash: Hash,
54 pub last_valid_block_height: u64,
55}
56
57#[deprecated(
59 since = "2.2.0",
60 note = "Use SendAndConfirmConfigV2 with send_and_confirm_transactions_in_parallel_v2"
61)]
62#[derive(Clone, Debug, Copy)]
63pub struct SendAndConfirmConfig {
64 pub with_spinner: bool,
65 pub resign_txs_count: Option<usize>,
66}
67
68#[derive(Clone, Debug, Copy)]
70pub struct SendAndConfirmConfigV2 {
71 pub with_spinner: bool,
72 pub resign_txs_count: Option<usize>,
73 pub rpc_send_transaction_config: RpcSendTransactionConfig,
74}
75
76#[allow(deprecated)]
77#[deprecated(
78 since = "2.2.0",
79 note = "Use send_and_confirm_transactions_in_parallel_v2"
80)]
81pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
82 rpc_client: Arc<RpcClient>,
83 tpu_client: Option<QuicTpuClient>,
84 messages: &[Message],
85 signers: &T,
86 config: SendAndConfirmConfig,
87) -> Result<Vec<Option<TransactionError>>> {
88 let config_v2 = SendAndConfirmConfigV2 {
89 with_spinner: config.with_spinner,
90 resign_txs_count: config.resign_txs_count,
91 rpc_send_transaction_config: RpcSendTransactionConfig {
92 ..RpcSendTransactionConfig::default()
93 },
94 };
95 send_and_confirm_transactions_in_parallel_v2(
96 rpc_client, tpu_client, messages, signers, config_v2,
97 )
98 .await
99}
100
101#[allow(deprecated)]
102#[deprecated(
103 since = "2.2.0",
104 note = "Use send_and_confirm_transactions_in_parallel_blocking_v2"
105)]
106pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
107 rpc_client: Arc<BlockingRpcClient>,
108 tpu_client: Option<QuicTpuClient>,
109 messages: &[Message],
110 signers: &T,
111 config: SendAndConfirmConfig,
112) -> Result<Vec<Option<TransactionError>>> {
113 let config_v2 = SendAndConfirmConfigV2 {
114 with_spinner: config.with_spinner,
115 resign_txs_count: config.resign_txs_count,
116 rpc_send_transaction_config: RpcSendTransactionConfig {
117 ..RpcSendTransactionConfig::default()
118 },
119 };
120 send_and_confirm_transactions_in_parallel_blocking_v2(
121 rpc_client, tpu_client, messages, signers, config_v2,
122 )
123}
124
125pub fn send_and_confirm_transactions_in_parallel_blocking_v2<T: Signers + ?Sized>(
127 rpc_client: Arc<BlockingRpcClient>,
128 tpu_client: Option<QuicTpuClient>,
129 messages: &[Message],
130 signers: &T,
131 config: SendAndConfirmConfigV2,
132) -> Result<Vec<Option<TransactionError>>> {
133 let fut = send_and_confirm_transactions_in_parallel_v2(
134 rpc_client.get_inner_client().clone(),
135 tpu_client,
136 messages,
137 signers,
138 config,
139 );
140 tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
141}
142
143fn create_blockhash_data_updating_task(
144 rpc_client: Arc<RpcClient>,
145 blockhash_data_rw: Arc<RwLock<BlockHashData>>,
146 current_block_height: Arc<AtomicU64>,
147) -> JoinHandle<()> {
148 tokio::spawn(async move {
149 loop {
150 if let Ok((blockhash, last_valid_block_height)) = rpc_client
151 .get_latest_blockhash_with_commitment(rpc_client.commitment())
152 .await
153 {
154 *blockhash_data_rw.write().await = BlockHashData {
155 blockhash,
156 last_valid_block_height,
157 };
158 }
159
160 if let Ok(block_height) = rpc_client.get_block_height().await {
161 current_block_height.store(block_height, Ordering::Relaxed);
162 }
163 tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
164 }
165 })
166}
167
168fn create_transaction_confirmation_task(
169 rpc_client: Arc<RpcClient>,
170 current_block_height: Arc<AtomicU64>,
171 unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
172 errors_map: Arc<DashMap<usize, TransactionError>>,
173 num_confirmed_transactions: Arc<AtomicUsize>,
174) -> JoinHandle<()> {
175 tokio::spawn(async move {
176 let mut last_block_height = current_block_height.load(Ordering::Relaxed);
178
179 loop {
180 if !unconfirmed_transaction_map.is_empty() {
181 let current_block_height = current_block_height.load(Ordering::Relaxed);
182 let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
183 .iter()
184 .filter(|x| {
185 let is_not_expired = current_block_height <= x.last_valid_block_height;
186 let is_recently_expired = last_block_height <= x.last_valid_block_height
188 && current_block_height > x.last_valid_block_height;
189 is_not_expired || is_recently_expired
190 })
191 .map(|x| *x.key())
192 .collect();
193 for signatures in
194 transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
195 {
196 if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
197 let statuses = result.value;
198 for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
199 if let Some((status, data)) = status
200 .filter(|status| {
201 status.satisfies_commitment(rpc_client.commitment())
202 })
203 .and_then(|status| {
204 unconfirmed_transaction_map
205 .remove(signature)
206 .map(|(_, data)| (status, data))
207 })
208 {
209 num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
210 match status.err {
211 Some(TransactionError::AlreadyProcessed) | None => {}
212 Some(error) => {
213 errors_map.insert(data.index, error);
214 }
215 }
216 };
217 }
218 }
219 }
220
221 last_block_height = current_block_height;
222 }
223 tokio::time::sleep(Duration::from_secs(1)).await;
224 }
225 })
226}
227
228#[derive(Clone, Debug)]
229struct SendingContext {
230 unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
231 error_map: Arc<DashMap<usize, TransactionError>>,
232 blockhash_data_rw: Arc<RwLock<BlockHashData>>,
233 num_confirmed_transactions: Arc<AtomicUsize>,
234 total_transactions: usize,
235 current_block_height: Arc<AtomicU64>,
236}
237fn progress_from_context_and_block_height(
238 context: &SendingContext,
239 last_valid_block_height: u64,
240) -> SendTransactionProgress {
241 SendTransactionProgress {
242 confirmed_transactions: context
243 .num_confirmed_transactions
244 .load(std::sync::atomic::Ordering::Relaxed),
245 total_transactions: context.total_transactions,
246 block_height: context
247 .current_block_height
248 .load(std::sync::atomic::Ordering::Relaxed),
249 last_valid_block_height,
250 }
251}
252
253async fn send_transaction_with_rpc_fallback(
254 rpc_client: &RpcClient,
255 tpu_client: &Option<QuicTpuClient>,
256 transaction: Transaction,
257 serialized_transaction: Vec<u8>,
258 context: &SendingContext,
259 index: usize,
260 rpc_send_transaction_config: RpcSendTransactionConfig,
261) -> Result<()> {
262 let send_over_rpc = if let Some(tpu_client) = tpu_client {
263 !tokio::time::timeout(
264 SEND_TIMEOUT_INTERVAL,
265 tpu_client.send_wire_transaction(serialized_transaction.clone()),
266 )
267 .await
268 .unwrap_or(false)
269 } else {
270 true
271 };
272 if send_over_rpc {
273 if let Err(e) = rpc_client
274 .send_transaction_with_config(
275 &transaction,
276 RpcSendTransactionConfig {
277 preflight_commitment: Some(rpc_client.commitment().commitment),
278 ..rpc_send_transaction_config
279 },
280 )
281 .await
282 {
283 match &e.kind {
284 ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
285 }
287 ErrorKind::TransactionError(TransactionError::BlockhashNotFound)
288 | ErrorKind::RpcError(RpcError::RpcResponseError {
289 data:
290 RpcResponseErrorData::SendTransactionPreflightFailure(
291 RpcSimulateTransactionResult {
292 err: Some(TransactionError::BlockhashNotFound),
293 ..
294 },
295 ),
296 ..
297 }) => {
298 }
300 ErrorKind::TransactionError(transaction_error)
301 | ErrorKind::RpcError(RpcError::RpcResponseError {
302 data:
303 RpcResponseErrorData::SendTransactionPreflightFailure(
304 RpcSimulateTransactionResult {
305 err: Some(transaction_error),
306 ..
307 },
308 ),
309 ..
310 }) => {
311 context.error_map.insert(index, transaction_error.clone());
313 }
314 _ => {
315 return Err(TpuSenderError::from(e));
316 }
317 }
318 }
319 }
320 Ok(())
321}
322
323async fn sign_all_messages_and_send<T: Signers + ?Sized>(
324 progress_bar: &Option<indicatif::ProgressBar>,
325 rpc_client: &RpcClient,
326 tpu_client: &Option<QuicTpuClient>,
327 messages_with_index: Vec<(usize, Message)>,
328 signers: &T,
329 context: &SendingContext,
330 rpc_send_transaction_config: RpcSendTransactionConfig,
331) -> Result<()> {
332 let current_transaction_count = messages_with_index.len();
333 let mut futures = vec![];
334 for (counter, (index, message)) in messages_with_index.iter().enumerate() {
336 let mut transaction = Transaction::new_unsigned(message.clone());
337 futures.push(async move {
338 tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
339 let blockhashdata = *context.blockhash_data_rw.read().await;
340
341 transaction
343 .try_sign(signers, blockhashdata.blockhash)
344 .expect("Transaction should be signable");
345 let serialized_transaction =
346 serialize(&transaction).expect("Transaction should serialize");
347 let signature = transaction.signatures[0];
348
349 context.unconfirmed_transaction_map.insert(
351 signature,
352 TransactionData {
353 index: *index,
354 serialized_transaction: serialized_transaction.clone(),
355 last_valid_block_height: blockhashdata.last_valid_block_height,
356 message: message.clone(),
357 },
358 );
359 if let Some(progress_bar) = progress_bar {
360 let progress = progress_from_context_and_block_height(
361 context,
362 blockhashdata.last_valid_block_height,
363 );
364 progress.set_message_for_confirmed_transactions(
365 progress_bar,
366 &format!(
367 "Sending {}/{} transactions",
368 counter + 1,
369 current_transaction_count,
370 ),
371 );
372 }
373 send_transaction_with_rpc_fallback(
374 rpc_client,
375 tpu_client,
376 transaction,
377 serialized_transaction,
378 context,
379 *index,
380 rpc_send_transaction_config,
381 )
382 .await
383 });
384 }
385 join_all(futures)
387 .await
388 .into_iter()
389 .collect::<Result<Vec<()>>>()?;
390 Ok(())
391}
392
393async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
394 progress_bar: &Option<indicatif::ProgressBar>,
395 tpu_client: &Option<QuicTpuClient>,
396 context: &SendingContext,
397) {
398 let unconfirmed_transaction_map = context.unconfirmed_transaction_map.clone();
399 let current_block_height = context.current_block_height.clone();
400
401 let transactions_to_confirm = unconfirmed_transaction_map.len();
402 let max_valid_block_height = unconfirmed_transaction_map
403 .iter()
404 .map(|x| x.last_valid_block_height)
405 .max();
406
407 if let Some(mut max_valid_block_height) = max_valid_block_height {
408 if let Some(progress_bar) = progress_bar {
409 let progress = progress_from_context_and_block_height(context, max_valid_block_height);
410 progress.set_message_for_confirmed_transactions(
411 progress_bar,
412 &format!(
413 "Waiting for next block, {transactions_to_confirm} transactions pending..."
414 ),
415 );
416 }
417
418 while !unconfirmed_transaction_map.is_empty()
420 && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
421 {
422 let block_height = current_block_height.load(Ordering::Relaxed);
423
424 if let Some(tpu_client) = tpu_client {
425 let txs_to_resend_over_tpu = unconfirmed_transaction_map
428 .iter()
429 .filter(|x| block_height < x.last_valid_block_height)
430 .map(|x| x.serialized_transaction.clone())
431 .collect::<Vec<_>>();
432 send_staggered_transactions(
433 progress_bar,
434 tpu_client,
435 txs_to_resend_over_tpu,
436 max_valid_block_height,
437 context,
438 )
439 .await;
440 } else {
441 tokio::time::sleep(Duration::from_millis(100)).await;
442 }
443 if let Some(max_valid_block_height_in_remaining_transaction) =
444 unconfirmed_transaction_map
445 .iter()
446 .map(|x| x.last_valid_block_height)
447 .max()
448 {
449 max_valid_block_height = max_valid_block_height_in_remaining_transaction;
450 }
451 }
452 }
453}
454
455async fn send_staggered_transactions(
456 progress_bar: &Option<indicatif::ProgressBar>,
457 tpu_client: &QuicTpuClient,
458 wire_transactions: Vec<Vec<u8>>,
459 last_valid_block_height: u64,
460 context: &SendingContext,
461) {
462 let current_transaction_count = wire_transactions.len();
463 let futures = wire_transactions
464 .into_iter()
465 .enumerate()
466 .map(|(counter, transaction)| async move {
467 tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
468 if let Some(progress_bar) = progress_bar {
469 let progress =
470 progress_from_context_and_block_height(context, last_valid_block_height);
471 progress.set_message_for_confirmed_transactions(
472 progress_bar,
473 &format!(
474 "Resending {}/{} transactions",
475 counter + 1,
476 current_transaction_count,
477 ),
478 );
479 }
480 tokio::time::timeout(
481 SEND_TIMEOUT_INTERVAL,
482 tpu_client.send_wire_transaction(transaction),
483 )
484 .await
485 })
486 .collect::<Vec<_>>();
487 join_all(futures).await;
488}
489
490pub async fn send_and_confirm_transactions_in_parallel_v2<T: Signers + ?Sized>(
496 rpc_client: Arc<RpcClient>,
497 tpu_client: Option<QuicTpuClient>,
498 messages: &[Message],
499 signers: &T,
500 config: SendAndConfirmConfigV2,
501) -> Result<Vec<Option<TransactionError>>> {
502 let (blockhash, last_valid_block_height) = rpc_client
504 .get_latest_blockhash_with_commitment(rpc_client.commitment())
505 .await?;
506 let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
507 blockhash,
508 last_valid_block_height,
509 }));
510
511 messages
513 .iter()
514 .map(|x| {
515 let mut transaction = Transaction::new_unsigned(x.clone());
516 transaction.try_sign(signers, blockhash)
517 })
518 .collect::<std::result::Result<Vec<()>, SignerError>>()?;
519
520 let block_height = rpc_client.get_block_height().await?;
522 let current_block_height = Arc::new(AtomicU64::new(block_height));
523
524 let progress_bar = config.with_spinner.then(|| {
525 let progress_bar = spinner::new_progress_bar();
526 progress_bar.set_message("Setting up...");
527 progress_bar
528 });
529
530 let block_data_task = create_blockhash_data_updating_task(
532 rpc_client.clone(),
533 blockhash_data_rw.clone(),
534 current_block_height.clone(),
535 );
536
537 let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
538 let error_map = Arc::new(DashMap::new());
539 let num_confirmed_transactions = Arc::new(AtomicUsize::new(0));
540 let transaction_confirming_task = create_transaction_confirmation_task(
542 rpc_client.clone(),
543 current_block_height.clone(),
544 unconfirmed_transasction_map.clone(),
545 error_map.clone(),
546 num_confirmed_transactions.clone(),
547 );
548
549 let total_transactions = messages.len();
551 let mut initial = true;
552 let signing_count = config.resign_txs_count.unwrap_or(1);
553 let context = SendingContext {
554 unconfirmed_transaction_map: unconfirmed_transasction_map.clone(),
555 blockhash_data_rw: blockhash_data_rw.clone(),
556 num_confirmed_transactions: num_confirmed_transactions.clone(),
557 current_block_height: current_block_height.clone(),
558 error_map: error_map.clone(),
559 total_transactions,
560 };
561
562 for expired_blockhash_retries in (0..signing_count).rev() {
563 let messages_with_index: Vec<(usize, Message)> = if initial {
565 initial = false;
566 messages.iter().cloned().enumerate().collect()
567 } else {
568 unconfirmed_transasction_map
570 .iter()
571 .map(|x| (x.index, x.message.clone()))
572 .collect()
573 };
574
575 if messages_with_index.is_empty() {
576 break;
577 }
578
579 unconfirmed_transasction_map.clear();
581
582 sign_all_messages_and_send(
583 &progress_bar,
584 &rpc_client,
585 &tpu_client,
586 messages_with_index,
587 signers,
588 &context,
589 config.rpc_send_transaction_config,
590 )
591 .await?;
592 confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
593 &progress_bar,
594 &tpu_client,
595 &context,
596 )
597 .await;
598
599 if unconfirmed_transasction_map.is_empty() {
600 break;
601 }
602
603 if let Some(progress_bar) = &progress_bar {
604 progress_bar.println(format!(
605 "Blockhash expired. {expired_blockhash_retries} retries remaining"
606 ));
607 }
608 }
609
610 block_data_task.abort();
611 transaction_confirming_task.abort();
612 if unconfirmed_transasction_map.is_empty() {
613 let mut transaction_errors = vec![None; messages.len()];
614 for iterator in error_map.iter() {
615 transaction_errors[*iterator.key()] = Some(iterator.value().clone());
616 }
617 Ok(transaction_errors)
618 } else {
619 Err(TpuSenderError::Custom("Max retries exceeded".into()))
620 }
621}