solana_client/
send_and_confirm_transactions_in_parallel.rs

1use {
2    crate::{
3        nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
4        rpc_client::RpcClient as BlockingRpcClient,
5    },
6    bincode::serialize,
7    dashmap::DashMap,
8    futures_util::future::join_all,
9    solana_hash::Hash,
10    solana_message::Message,
11    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
12    solana_rpc_client::spinner::{self, SendTransactionProgress},
13    solana_rpc_client_api::{
14        client_error::ErrorKind,
15        config::RpcSendTransactionConfig,
16        request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
17        response::RpcSimulateTransactionResult,
18    },
19    solana_signature::Signature,
20    solana_signer::{signers::Signers, SignerError},
21    solana_tpu_client::tpu_client::{Result, TpuSenderError},
22    solana_transaction::Transaction,
23    solana_transaction_error::TransactionError,
24    std::{
25        sync::{
26            atomic::{AtomicU64, AtomicUsize, Ordering},
27            Arc,
28        },
29        time::Duration,
30    },
31    tokio::{sync::RwLock, task::JoinHandle},
32};
33
34const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
35const SEND_INTERVAL: Duration = Duration::from_millis(10);
36// This is a "reasonable" constant for how long it should
37// take to fan the transactions out, taken from
38// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
39const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
40
41type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
42
43#[derive(Clone, Debug)]
44struct TransactionData {
45    last_valid_block_height: u64,
46    message: Message,
47    index: usize,
48    serialized_transaction: Vec<u8>,
49}
50
51#[derive(Clone, Debug, Copy)]
52struct BlockHashData {
53    pub blockhash: Hash,
54    pub last_valid_block_height: u64,
55}
56
57// Deprecated struct to maintain backward compatibility
58#[deprecated(
59    since = "2.2.0",
60    note = "Use SendAndConfirmConfigV2 with send_and_confirm_transactions_in_parallel_v2"
61)]
62#[derive(Clone, Debug, Copy)]
63pub struct SendAndConfirmConfig {
64    pub with_spinner: bool,
65    pub resign_txs_count: Option<usize>,
66}
67
68// New struct with RpcSendTransactionConfig for non-breaking change
69#[derive(Clone, Debug, Copy)]
70pub struct SendAndConfirmConfigV2 {
71    pub with_spinner: bool,
72    pub resign_txs_count: Option<usize>,
73    pub rpc_send_transaction_config: RpcSendTransactionConfig,
74}
75
76#[allow(deprecated)]
77#[deprecated(
78    since = "2.2.0",
79    note = "Use send_and_confirm_transactions_in_parallel_v2"
80)]
81pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
82    rpc_client: Arc<RpcClient>,
83    tpu_client: Option<QuicTpuClient>,
84    messages: &[Message],
85    signers: &T,
86    config: SendAndConfirmConfig,
87) -> Result<Vec<Option<TransactionError>>> {
88    let config_v2 = SendAndConfirmConfigV2 {
89        with_spinner: config.with_spinner,
90        resign_txs_count: config.resign_txs_count,
91        rpc_send_transaction_config: RpcSendTransactionConfig {
92            ..RpcSendTransactionConfig::default()
93        },
94    };
95    send_and_confirm_transactions_in_parallel_v2(
96        rpc_client, tpu_client, messages, signers, config_v2,
97    )
98    .await
99}
100
101#[allow(deprecated)]
102#[deprecated(
103    since = "2.2.0",
104    note = "Use send_and_confirm_transactions_in_parallel_blocking_v2"
105)]
106pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
107    rpc_client: Arc<BlockingRpcClient>,
108    tpu_client: Option<QuicTpuClient>,
109    messages: &[Message],
110    signers: &T,
111    config: SendAndConfirmConfig,
112) -> Result<Vec<Option<TransactionError>>> {
113    let config_v2 = SendAndConfirmConfigV2 {
114        with_spinner: config.with_spinner,
115        resign_txs_count: config.resign_txs_count,
116        rpc_send_transaction_config: RpcSendTransactionConfig {
117            ..RpcSendTransactionConfig::default()
118        },
119    };
120    send_and_confirm_transactions_in_parallel_blocking_v2(
121        rpc_client, tpu_client, messages, signers, config_v2,
122    )
123}
124
125/// Sends and confirms transactions concurrently in a sync context
126pub fn send_and_confirm_transactions_in_parallel_blocking_v2<T: Signers + ?Sized>(
127    rpc_client: Arc<BlockingRpcClient>,
128    tpu_client: Option<QuicTpuClient>,
129    messages: &[Message],
130    signers: &T,
131    config: SendAndConfirmConfigV2,
132) -> Result<Vec<Option<TransactionError>>> {
133    let fut = send_and_confirm_transactions_in_parallel_v2(
134        rpc_client.get_inner_client().clone(),
135        tpu_client,
136        messages,
137        signers,
138        config,
139    );
140    tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
141}
142
143fn create_blockhash_data_updating_task(
144    rpc_client: Arc<RpcClient>,
145    blockhash_data_rw: Arc<RwLock<BlockHashData>>,
146    current_block_height: Arc<AtomicU64>,
147) -> JoinHandle<()> {
148    tokio::spawn(async move {
149        loop {
150            if let Ok((blockhash, last_valid_block_height)) = rpc_client
151                .get_latest_blockhash_with_commitment(rpc_client.commitment())
152                .await
153            {
154                *blockhash_data_rw.write().await = BlockHashData {
155                    blockhash,
156                    last_valid_block_height,
157                };
158            }
159
160            if let Ok(block_height) = rpc_client.get_block_height().await {
161                current_block_height.store(block_height, Ordering::Relaxed);
162            }
163            tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
164        }
165    })
166}
167
168fn create_transaction_confirmation_task(
169    rpc_client: Arc<RpcClient>,
170    current_block_height: Arc<AtomicU64>,
171    unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
172    errors_map: Arc<DashMap<usize, TransactionError>>,
173    num_confirmed_transactions: Arc<AtomicUsize>,
174) -> JoinHandle<()> {
175    tokio::spawn(async move {
176        // check transactions that are not expired or have just expired between two checks
177        let mut last_block_height = current_block_height.load(Ordering::Relaxed);
178
179        loop {
180            if !unconfirmed_transaction_map.is_empty() {
181                let current_block_height = current_block_height.load(Ordering::Relaxed);
182                let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
183                    .iter()
184                    .filter(|x| {
185                        let is_not_expired = current_block_height <= x.last_valid_block_height;
186                        // transaction expired between last and current check
187                        let is_recently_expired = last_block_height <= x.last_valid_block_height
188                            && current_block_height > x.last_valid_block_height;
189                        is_not_expired || is_recently_expired
190                    })
191                    .map(|x| *x.key())
192                    .collect();
193                for signatures in
194                    transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
195                {
196                    if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
197                        let statuses = result.value;
198                        for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
199                            if let Some((status, data)) = status
200                                .filter(|status| {
201                                    status.satisfies_commitment(rpc_client.commitment())
202                                })
203                                .and_then(|status| {
204                                    unconfirmed_transaction_map
205                                        .remove(signature)
206                                        .map(|(_, data)| (status, data))
207                                })
208                            {
209                                num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
210                                match status.err {
211                                    Some(TransactionError::AlreadyProcessed) | None => {}
212                                    Some(error) => {
213                                        errors_map.insert(data.index, error);
214                                    }
215                                }
216                            };
217                        }
218                    }
219                }
220
221                last_block_height = current_block_height;
222            }
223            tokio::time::sleep(Duration::from_secs(1)).await;
224        }
225    })
226}
227
228#[derive(Clone, Debug)]
229struct SendingContext {
230    unconfirmed_transaction_map: Arc<DashMap<Signature, TransactionData>>,
231    error_map: Arc<DashMap<usize, TransactionError>>,
232    blockhash_data_rw: Arc<RwLock<BlockHashData>>,
233    num_confirmed_transactions: Arc<AtomicUsize>,
234    total_transactions: usize,
235    current_block_height: Arc<AtomicU64>,
236}
237fn progress_from_context_and_block_height(
238    context: &SendingContext,
239    last_valid_block_height: u64,
240) -> SendTransactionProgress {
241    SendTransactionProgress {
242        confirmed_transactions: context
243            .num_confirmed_transactions
244            .load(std::sync::atomic::Ordering::Relaxed),
245        total_transactions: context.total_transactions,
246        block_height: context
247            .current_block_height
248            .load(std::sync::atomic::Ordering::Relaxed),
249        last_valid_block_height,
250    }
251}
252
253async fn send_transaction_with_rpc_fallback(
254    rpc_client: &RpcClient,
255    tpu_client: &Option<QuicTpuClient>,
256    transaction: Transaction,
257    serialized_transaction: Vec<u8>,
258    context: &SendingContext,
259    index: usize,
260    rpc_send_transaction_config: RpcSendTransactionConfig,
261) -> Result<()> {
262    let send_over_rpc = if let Some(tpu_client) = tpu_client {
263        !tokio::time::timeout(
264            SEND_TIMEOUT_INTERVAL,
265            tpu_client.send_wire_transaction(serialized_transaction.clone()),
266        )
267        .await
268        .unwrap_or(false)
269    } else {
270        true
271    };
272    if send_over_rpc {
273        if let Err(e) = rpc_client
274            .send_transaction_with_config(
275                &transaction,
276                RpcSendTransactionConfig {
277                    preflight_commitment: Some(rpc_client.commitment().commitment),
278                    ..rpc_send_transaction_config
279                },
280            )
281            .await
282        {
283            match &e.kind {
284                ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
285                    // fall through on io error, we will retry the transaction
286                }
287                ErrorKind::TransactionError(TransactionError::BlockhashNotFound)
288                | ErrorKind::RpcError(RpcError::RpcResponseError {
289                    data:
290                        RpcResponseErrorData::SendTransactionPreflightFailure(
291                            RpcSimulateTransactionResult {
292                                err: Some(TransactionError::BlockhashNotFound),
293                                ..
294                            },
295                        ),
296                    ..
297                }) => {
298                    // fall through so that we will resend with another blockhash
299                }
300                ErrorKind::TransactionError(transaction_error)
301                | ErrorKind::RpcError(RpcError::RpcResponseError {
302                    data:
303                        RpcResponseErrorData::SendTransactionPreflightFailure(
304                            RpcSimulateTransactionResult {
305                                err: Some(transaction_error),
306                                ..
307                            },
308                        ),
309                    ..
310                }) => {
311                    // if we get other than blockhash not found error the transaction is invalid
312                    context.error_map.insert(index, transaction_error.clone());
313                }
314                _ => {
315                    return Err(TpuSenderError::from(e));
316                }
317            }
318        }
319    }
320    Ok(())
321}
322
323async fn sign_all_messages_and_send<T: Signers + ?Sized>(
324    progress_bar: &Option<indicatif::ProgressBar>,
325    rpc_client: &RpcClient,
326    tpu_client: &Option<QuicTpuClient>,
327    messages_with_index: Vec<(usize, Message)>,
328    signers: &T,
329    context: &SendingContext,
330    rpc_send_transaction_config: RpcSendTransactionConfig,
331) -> Result<()> {
332    let current_transaction_count = messages_with_index.len();
333    let mut futures = vec![];
334    // send all the transaction messages
335    for (counter, (index, message)) in messages_with_index.iter().enumerate() {
336        let mut transaction = Transaction::new_unsigned(message.clone());
337        futures.push(async move {
338            tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
339            let blockhashdata = *context.blockhash_data_rw.read().await;
340
341            // we have already checked if all transactions are signable.
342            transaction
343                .try_sign(signers, blockhashdata.blockhash)
344                .expect("Transaction should be signable");
345            let serialized_transaction =
346                serialize(&transaction).expect("Transaction should serialize");
347            let signature = transaction.signatures[0];
348
349            // send to confirm the transaction
350            context.unconfirmed_transaction_map.insert(
351                signature,
352                TransactionData {
353                    index: *index,
354                    serialized_transaction: serialized_transaction.clone(),
355                    last_valid_block_height: blockhashdata.last_valid_block_height,
356                    message: message.clone(),
357                },
358            );
359            if let Some(progress_bar) = progress_bar {
360                let progress = progress_from_context_and_block_height(
361                    context,
362                    blockhashdata.last_valid_block_height,
363                );
364                progress.set_message_for_confirmed_transactions(
365                    progress_bar,
366                    &format!(
367                        "Sending {}/{} transactions",
368                        counter + 1,
369                        current_transaction_count,
370                    ),
371                );
372            }
373            send_transaction_with_rpc_fallback(
374                rpc_client,
375                tpu_client,
376                transaction,
377                serialized_transaction,
378                context,
379                *index,
380                rpc_send_transaction_config,
381            )
382            .await
383        });
384    }
385    // collect to convert Vec<Result<_>> to Result<Vec<_>>
386    join_all(futures)
387        .await
388        .into_iter()
389        .collect::<Result<Vec<()>>>()?;
390    Ok(())
391}
392
393async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
394    progress_bar: &Option<indicatif::ProgressBar>,
395    tpu_client: &Option<QuicTpuClient>,
396    context: &SendingContext,
397) {
398    let unconfirmed_transaction_map = context.unconfirmed_transaction_map.clone();
399    let current_block_height = context.current_block_height.clone();
400
401    let transactions_to_confirm = unconfirmed_transaction_map.len();
402    let max_valid_block_height = unconfirmed_transaction_map
403        .iter()
404        .map(|x| x.last_valid_block_height)
405        .max();
406
407    if let Some(mut max_valid_block_height) = max_valid_block_height {
408        if let Some(progress_bar) = progress_bar {
409            let progress = progress_from_context_and_block_height(context, max_valid_block_height);
410            progress.set_message_for_confirmed_transactions(
411                progress_bar,
412                &format!(
413                    "Waiting for next block, {transactions_to_confirm} transactions pending..."
414                ),
415            );
416        }
417
418        // wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
419        while !unconfirmed_transaction_map.is_empty()
420            && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
421        {
422            let block_height = current_block_height.load(Ordering::Relaxed);
423
424            if let Some(tpu_client) = tpu_client {
425                // retry sending transaction only over TPU port
426                // any transactions sent over RPC will be automatically rebroadcast by the RPC server
427                let txs_to_resend_over_tpu = unconfirmed_transaction_map
428                    .iter()
429                    .filter(|x| block_height < x.last_valid_block_height)
430                    .map(|x| x.serialized_transaction.clone())
431                    .collect::<Vec<_>>();
432                send_staggered_transactions(
433                    progress_bar,
434                    tpu_client,
435                    txs_to_resend_over_tpu,
436                    max_valid_block_height,
437                    context,
438                )
439                .await;
440            } else {
441                tokio::time::sleep(Duration::from_millis(100)).await;
442            }
443            if let Some(max_valid_block_height_in_remaining_transaction) =
444                unconfirmed_transaction_map
445                    .iter()
446                    .map(|x| x.last_valid_block_height)
447                    .max()
448            {
449                max_valid_block_height = max_valid_block_height_in_remaining_transaction;
450            }
451        }
452    }
453}
454
455async fn send_staggered_transactions(
456    progress_bar: &Option<indicatif::ProgressBar>,
457    tpu_client: &QuicTpuClient,
458    wire_transactions: Vec<Vec<u8>>,
459    last_valid_block_height: u64,
460    context: &SendingContext,
461) {
462    let current_transaction_count = wire_transactions.len();
463    let futures = wire_transactions
464        .into_iter()
465        .enumerate()
466        .map(|(counter, transaction)| async move {
467            tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
468            if let Some(progress_bar) = progress_bar {
469                let progress =
470                    progress_from_context_and_block_height(context, last_valid_block_height);
471                progress.set_message_for_confirmed_transactions(
472                    progress_bar,
473                    &format!(
474                        "Resending {}/{} transactions",
475                        counter + 1,
476                        current_transaction_count,
477                    ),
478                );
479            }
480            tokio::time::timeout(
481                SEND_TIMEOUT_INTERVAL,
482                tpu_client.send_wire_transaction(transaction),
483            )
484            .await
485        })
486        .collect::<Vec<_>>();
487    join_all(futures).await;
488}
489
490/// Sends and confirms transactions concurrently
491///
492/// The sending and confirmation of transactions is done in parallel tasks
493/// The method signs transactions just before sending so that blockhash does not
494/// expire.
495pub async fn send_and_confirm_transactions_in_parallel_v2<T: Signers + ?Sized>(
496    rpc_client: Arc<RpcClient>,
497    tpu_client: Option<QuicTpuClient>,
498    messages: &[Message],
499    signers: &T,
500    config: SendAndConfirmConfigV2,
501) -> Result<Vec<Option<TransactionError>>> {
502    // get current blockhash and corresponding last valid block height
503    let (blockhash, last_valid_block_height) = rpc_client
504        .get_latest_blockhash_with_commitment(rpc_client.commitment())
505        .await?;
506    let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
507        blockhash,
508        last_valid_block_height,
509    }));
510
511    // check if all the messages are signable by the signers
512    messages
513        .iter()
514        .map(|x| {
515            let mut transaction = Transaction::new_unsigned(x.clone());
516            transaction.try_sign(signers, blockhash)
517        })
518        .collect::<std::result::Result<Vec<()>, SignerError>>()?;
519
520    // get current block height
521    let block_height = rpc_client.get_block_height().await?;
522    let current_block_height = Arc::new(AtomicU64::new(block_height));
523
524    let progress_bar = config.with_spinner.then(|| {
525        let progress_bar = spinner::new_progress_bar();
526        progress_bar.set_message("Setting up...");
527        progress_bar
528    });
529
530    // blockhash and block height update task
531    let block_data_task = create_blockhash_data_updating_task(
532        rpc_client.clone(),
533        blockhash_data_rw.clone(),
534        current_block_height.clone(),
535    );
536
537    let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
538    let error_map = Arc::new(DashMap::new());
539    let num_confirmed_transactions = Arc::new(AtomicUsize::new(0));
540    // tasks which confirms the transactions that were sent
541    let transaction_confirming_task = create_transaction_confirmation_task(
542        rpc_client.clone(),
543        current_block_height.clone(),
544        unconfirmed_transasction_map.clone(),
545        error_map.clone(),
546        num_confirmed_transactions.clone(),
547    );
548
549    // transaction sender task
550    let total_transactions = messages.len();
551    let mut initial = true;
552    let signing_count = config.resign_txs_count.unwrap_or(1);
553    let context = SendingContext {
554        unconfirmed_transaction_map: unconfirmed_transasction_map.clone(),
555        blockhash_data_rw: blockhash_data_rw.clone(),
556        num_confirmed_transactions: num_confirmed_transactions.clone(),
557        current_block_height: current_block_height.clone(),
558        error_map: error_map.clone(),
559        total_transactions,
560    };
561
562    for expired_blockhash_retries in (0..signing_count).rev() {
563        // only send messages which have not been confirmed
564        let messages_with_index: Vec<(usize, Message)> = if initial {
565            initial = false;
566            messages.iter().cloned().enumerate().collect()
567        } else {
568            // remove all the confirmed transactions
569            unconfirmed_transasction_map
570                .iter()
571                .map(|x| (x.index, x.message.clone()))
572                .collect()
573        };
574
575        if messages_with_index.is_empty() {
576            break;
577        }
578
579        // clear the map so that we can start resending
580        unconfirmed_transasction_map.clear();
581
582        sign_all_messages_and_send(
583            &progress_bar,
584            &rpc_client,
585            &tpu_client,
586            messages_with_index,
587            signers,
588            &context,
589            config.rpc_send_transaction_config,
590        )
591        .await?;
592        confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
593            &progress_bar,
594            &tpu_client,
595            &context,
596        )
597        .await;
598
599        if unconfirmed_transasction_map.is_empty() {
600            break;
601        }
602
603        if let Some(progress_bar) = &progress_bar {
604            progress_bar.println(format!(
605                "Blockhash expired. {expired_blockhash_retries} retries remaining"
606            ));
607        }
608    }
609
610    block_data_task.abort();
611    transaction_confirming_task.abort();
612    if unconfirmed_transasction_map.is_empty() {
613        let mut transaction_errors = vec![None; messages.len()];
614        for iterator in error_map.iter() {
615            transaction_errors[*iterator.key()] = Some(iterator.value().clone());
616        }
617        Ok(transaction_errors)
618    } else {
619        Err(TpuSenderError::Custom("Max retries exceeded".into()))
620    }
621}