solana_banks_server/
banks_server.rs

1use {
2    bincode::{deserialize, serialize},
3    crossbeam_channel::{unbounded, Receiver, Sender},
4    futures::{future, prelude::stream::StreamExt},
5    solana_banks_interface::{
6        Banks, BanksRequest, BanksResponse, BanksTransactionResultWithMetadata,
7        BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionMetadata,
8        TransactionSimulationDetails, TransactionStatus,
9    },
10    solana_client::connection_cache::ConnectionCache,
11    solana_feature_set::{move_precompile_verification_to_svm, FeatureSet},
12    solana_runtime::{
13        bank::{Bank, TransactionSimulationResult},
14        bank_forks::BankForks,
15        commitment::BlockCommitmentCache,
16        verify_precompiles::verify_precompiles,
17    },
18    solana_sdk::{
19        account::Account,
20        clock::Slot,
21        commitment_config::CommitmentLevel,
22        hash::Hash,
23        message::{Message, SanitizedMessage},
24        pubkey::Pubkey,
25        signature::Signature,
26        transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction},
27    },
28    solana_send_transaction_service::{
29        send_transaction_service::{SendTransactionService, TransactionInfo},
30        tpu_info::NullTpuInfo,
31    },
32    std::{
33        io,
34        net::{Ipv4Addr, SocketAddr},
35        sync::{atomic::AtomicBool, Arc, RwLock},
36        thread::Builder,
37        time::Duration,
38    },
39    tarpc::{
40        context::Context,
41        serde_transport::tcp,
42        server::{self, incoming::Incoming, Channel},
43        transport::{self, channel::UnboundedChannel},
44        ClientMessage, Response,
45    },
46    tokio::time::sleep,
47    tokio_serde::formats::Bincode,
48};
49
50#[derive(Clone)]
51struct BanksServer {
52    bank_forks: Arc<RwLock<BankForks>>,
53    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
54    transaction_sender: Sender<TransactionInfo>,
55    poll_signature_status_sleep_duration: Duration,
56}
57
58impl BanksServer {
59    /// Return a BanksServer that forwards transactions to the
60    /// given sender. If unit-testing, those transactions can go to
61    /// a bank in the given BankForks. Otherwise, the receiver should
62    /// forward them to a validator in the leader schedule.
63    fn new(
64        bank_forks: Arc<RwLock<BankForks>>,
65        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
66        transaction_sender: Sender<TransactionInfo>,
67        poll_signature_status_sleep_duration: Duration,
68    ) -> Self {
69        Self {
70            bank_forks,
71            block_commitment_cache,
72            transaction_sender,
73            poll_signature_status_sleep_duration,
74        }
75    }
76
77    fn run(bank_forks: Arc<RwLock<BankForks>>, transaction_receiver: Receiver<TransactionInfo>) {
78        while let Ok(info) = transaction_receiver.recv() {
79            let mut transaction_infos = vec![info];
80            while let Ok(info) = transaction_receiver.try_recv() {
81                transaction_infos.push(info);
82            }
83            let transactions: Vec<_> = transaction_infos
84                .into_iter()
85                .map(|info| deserialize(&info.wire_transaction).unwrap())
86                .collect();
87            loop {
88                let bank = bank_forks.read().unwrap().working_bank();
89                // bank forks lock released, now verify bank hasn't been frozen yet
90                // in the mean-time the bank can not be frozen until this tx batch
91                // has been processed
92                let lock = bank.freeze_lock();
93                if *lock == Hash::default() {
94                    let _ = bank.try_process_entry_transactions(transactions);
95                    // break out of inner loop and release bank freeze lock
96                    break;
97                }
98            }
99        }
100    }
101
102    /// Useful for unit-testing
103    fn new_loopback(
104        bank_forks: Arc<RwLock<BankForks>>,
105        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
106        poll_signature_status_sleep_duration: Duration,
107    ) -> Self {
108        let (transaction_sender, transaction_receiver) = unbounded();
109        let bank = bank_forks.read().unwrap().working_bank();
110        let slot = bank.slot();
111        {
112            // ensure that the commitment cache and bank are synced
113            let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
114            w_block_commitment_cache.set_all_slots(slot, slot);
115        }
116        let server_bank_forks = bank_forks.clone();
117        Builder::new()
118            .name("solBankForksCli".to_string())
119            .spawn(move || Self::run(server_bank_forks, transaction_receiver))
120            .unwrap();
121        Self::new(
122            bank_forks,
123            block_commitment_cache,
124            transaction_sender,
125            poll_signature_status_sleep_duration,
126        )
127    }
128
129    fn slot(&self, commitment: CommitmentLevel) -> Slot {
130        self.block_commitment_cache
131            .read()
132            .unwrap()
133            .slot_with_commitment(commitment)
134    }
135
136    fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
137        self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
138    }
139
140    async fn poll_signature_status(
141        self,
142        signature: &Signature,
143        blockhash: &Hash,
144        last_valid_block_height: u64,
145        commitment: CommitmentLevel,
146    ) -> Option<transaction::Result<()>> {
147        let mut status = self
148            .bank(commitment)
149            .get_signature_status_with_blockhash(signature, blockhash);
150        while status.is_none() {
151            sleep(self.poll_signature_status_sleep_duration).await;
152            let bank = self.bank(commitment);
153            if bank.block_height() > last_valid_block_height {
154                break;
155            }
156            status = bank.get_signature_status_with_blockhash(signature, blockhash);
157        }
158        status
159    }
160}
161
162fn verify_transaction(
163    transaction: &SanitizedTransaction,
164    feature_set: &Arc<FeatureSet>,
165) -> transaction::Result<()> {
166    transaction.verify()?;
167
168    let move_precompile_verification_to_svm =
169        feature_set.is_active(&move_precompile_verification_to_svm::id());
170    if !move_precompile_verification_to_svm {
171        verify_precompiles(transaction, feature_set)?;
172    }
173
174    Ok(())
175}
176
177fn simulate_transaction(
178    bank: &Bank,
179    transaction: VersionedTransaction,
180) -> BanksTransactionResultWithSimulation {
181    let sanitized_transaction = match SanitizedTransaction::try_create(
182        transaction,
183        MessageHash::Compute,
184        Some(false), // is_simple_vote_tx
185        bank,
186        bank.get_reserved_account_keys(),
187    ) {
188        Err(err) => {
189            return BanksTransactionResultWithSimulation {
190                result: Some(Err(err)),
191                simulation_details: None,
192            };
193        }
194        Ok(tx) => tx,
195    };
196    let TransactionSimulationResult {
197        result,
198        logs,
199        post_simulation_accounts: _,
200        units_consumed,
201        return_data,
202        inner_instructions,
203    } = bank.simulate_transaction_unchecked(&sanitized_transaction, true);
204
205    let simulation_details = TransactionSimulationDetails {
206        logs,
207        units_consumed,
208        return_data,
209        inner_instructions,
210    };
211    BanksTransactionResultWithSimulation {
212        result: Some(result),
213        simulation_details: Some(simulation_details),
214    }
215}
216
217#[tarpc::server]
218impl Banks for BanksServer {
219    async fn send_transaction_with_context(self, _: Context, transaction: VersionedTransaction) {
220        let blockhash = transaction.message.recent_blockhash();
221        let last_valid_block_height = self
222            .bank_forks
223            .read()
224            .unwrap()
225            .root_bank()
226            .get_blockhash_last_valid_block_height(blockhash)
227            .unwrap();
228        let signature = transaction.signatures.first().cloned().unwrap_or_default();
229        let info = TransactionInfo::new(
230            signature,
231            serialize(&transaction).unwrap(),
232            last_valid_block_height,
233            None,
234            None,
235            None,
236        );
237        self.transaction_sender.send(info).unwrap();
238    }
239
240    async fn get_transaction_status_with_context(
241        self,
242        _: Context,
243        signature: Signature,
244    ) -> Option<TransactionStatus> {
245        let bank = self.bank(CommitmentLevel::Processed);
246        let (slot, status) = bank.get_signature_status_slot(&signature)?;
247        let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
248
249        let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed);
250        let optimistically_confirmed =
251            optimistically_confirmed_bank.get_signature_status_slot(&signature);
252
253        let confirmations = if r_block_commitment_cache.root() >= slot
254            && r_block_commitment_cache.highest_super_majority_root() >= slot
255        {
256            None
257        } else {
258            r_block_commitment_cache
259                .get_confirmation_count(slot)
260                .or(Some(0))
261        };
262        Some(TransactionStatus {
263            slot,
264            confirmations,
265            err: status.err(),
266            confirmation_status: if confirmations.is_none() {
267                Some(TransactionConfirmationStatus::Finalized)
268            } else if optimistically_confirmed.is_some() {
269                Some(TransactionConfirmationStatus::Confirmed)
270            } else {
271                Some(TransactionConfirmationStatus::Processed)
272            },
273        })
274    }
275
276    async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
277        self.slot(commitment)
278    }
279
280    async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 {
281        self.bank(commitment).block_height()
282    }
283
284    async fn process_transaction_with_preflight_and_commitment_and_context(
285        self,
286        ctx: Context,
287        transaction: VersionedTransaction,
288        commitment: CommitmentLevel,
289    ) -> BanksTransactionResultWithSimulation {
290        let mut simulation_result =
291            simulate_transaction(&self.bank(commitment), transaction.clone());
292        // Simulation was ok, so process the real transaction and replace the
293        // simulation's result with the real transaction result
294        if let Some(Ok(_)) = simulation_result.result {
295            simulation_result.result = self
296                .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
297                .await;
298        }
299        simulation_result
300    }
301
302    async fn simulate_transaction_with_commitment_and_context(
303        self,
304        _: Context,
305        transaction: VersionedTransaction,
306        commitment: CommitmentLevel,
307    ) -> BanksTransactionResultWithSimulation {
308        simulate_transaction(&self.bank(commitment), transaction)
309    }
310
311    async fn process_transaction_with_commitment_and_context(
312        self,
313        _: Context,
314        transaction: VersionedTransaction,
315        commitment: CommitmentLevel,
316    ) -> Option<transaction::Result<()>> {
317        let bank = self.bank(commitment);
318        let sanitized_transaction = match SanitizedTransaction::try_create(
319            transaction.clone(),
320            MessageHash::Compute,
321            Some(false), // is_simple_vote_tx
322            bank.as_ref(),
323            bank.get_reserved_account_keys(),
324        ) {
325            Ok(tx) => tx,
326            Err(err) => return Some(Err(err)),
327        };
328
329        if let Err(err) = verify_transaction(&sanitized_transaction, &bank.feature_set) {
330            return Some(Err(err));
331        }
332
333        let blockhash = transaction.message.recent_blockhash();
334        let last_valid_block_height = self
335            .bank(commitment)
336            .get_blockhash_last_valid_block_height(blockhash)
337            .unwrap();
338        let signature = sanitized_transaction.signature();
339        let info = TransactionInfo::new(
340            *signature,
341            serialize(&transaction).unwrap(),
342            last_valid_block_height,
343            None,
344            None,
345            None,
346        );
347        self.transaction_sender.send(info).unwrap();
348        self.poll_signature_status(signature, blockhash, last_valid_block_height, commitment)
349            .await
350    }
351
352    async fn process_transaction_with_metadata_and_context(
353        self,
354        _: Context,
355        transaction: VersionedTransaction,
356    ) -> BanksTransactionResultWithMetadata {
357        let bank = self.bank_forks.read().unwrap().working_bank();
358        match bank.process_transaction_with_metadata(transaction) {
359            Err(error) => BanksTransactionResultWithMetadata {
360                result: Err(error),
361                metadata: None,
362            },
363            Ok(details) => BanksTransactionResultWithMetadata {
364                result: details.status,
365                metadata: Some(TransactionMetadata {
366                    compute_units_consumed: details.executed_units,
367                    log_messages: details.log_messages.unwrap_or_default(),
368                    return_data: details.return_data,
369                }),
370            },
371        }
372    }
373
374    async fn get_account_with_commitment_and_context(
375        self,
376        _: Context,
377        address: Pubkey,
378        commitment: CommitmentLevel,
379    ) -> Option<Account> {
380        let bank = self.bank(commitment);
381        bank.get_account(&address).map(Account::from)
382    }
383
384    async fn get_latest_blockhash_with_context(self, _: Context) -> Hash {
385        let bank = self.bank(CommitmentLevel::default());
386        bank.last_blockhash()
387    }
388
389    async fn get_latest_blockhash_with_commitment_and_context(
390        self,
391        _: Context,
392        commitment: CommitmentLevel,
393    ) -> Option<(Hash, u64)> {
394        let bank = self.bank(commitment);
395        let blockhash = bank.last_blockhash();
396        let last_valid_block_height = bank.get_blockhash_last_valid_block_height(&blockhash)?;
397        Some((blockhash, last_valid_block_height))
398    }
399
400    async fn get_fee_for_message_with_commitment_and_context(
401        self,
402        _: Context,
403        message: Message,
404        commitment: CommitmentLevel,
405    ) -> Option<u64> {
406        let bank = self.bank(commitment);
407        let sanitized_message =
408            SanitizedMessage::try_from_legacy_message(message, bank.get_reserved_account_keys())
409                .ok()?;
410        bank.get_fee_for_message(&sanitized_message)
411    }
412}
413
414pub async fn start_local_server(
415    bank_forks: Arc<RwLock<BankForks>>,
416    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
417    poll_signature_status_sleep_duration: Duration,
418) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
419    let banks_server = BanksServer::new_loopback(
420        bank_forks,
421        block_commitment_cache,
422        poll_signature_status_sleep_duration,
423    );
424    let (client_transport, server_transport) = transport::channel::unbounded();
425    let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve());
426    tokio::spawn(server);
427    client_transport
428}
429
430pub async fn start_tcp_server(
431    listen_addr: SocketAddr,
432    tpu_addr: SocketAddr,
433    bank_forks: Arc<RwLock<BankForks>>,
434    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
435    connection_cache: Arc<ConnectionCache>,
436    exit: Arc<AtomicBool>,
437) -> io::Result<()> {
438    // Note: These settings are copied straight from the tarpc example.
439    let server = tcp::listen(listen_addr, Bincode::default)
440        .await?
441        // Ignore accept errors.
442        .filter_map(|r| future::ready(r.ok()))
443        .map(server::BaseChannel::with_defaults)
444        // Limit channels to 1 per IP.
445        .max_channels_per_key(1, |t| {
446            t.as_ref()
447                .peer_addr()
448                .map(|x| x.ip())
449                .unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into())
450        })
451        // serve is generated by the service attribute. It takes as input any type implementing
452        // the generated Banks trait.
453        .map(move |chan| {
454            let (sender, receiver) = unbounded();
455
456            SendTransactionService::new::<NullTpuInfo>(
457                tpu_addr,
458                &bank_forks,
459                None,
460                receiver,
461                &connection_cache,
462                5_000,
463                0,
464                exit.clone(),
465            );
466
467            let server = BanksServer::new(
468                bank_forks.clone(),
469                block_commitment_cache.clone(),
470                sender,
471                Duration::from_millis(200),
472            );
473            chan.execute(server.serve())
474        })
475        // Max 10 channels.
476        .buffer_unordered(10)
477        .for_each(|_| async {});
478
479    server.await;
480    Ok(())
481}