solana_banks_server/
banks_server.rs

1use {
2    bincode::{deserialize, serialize},
3    crossbeam_channel::{unbounded, Receiver, Sender},
4    futures::{future, prelude::stream::StreamExt},
5    solana_banks_interface::{
6        Banks, BanksRequest, BanksResponse, BanksTransactionResultWithMetadata,
7        BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionMetadata,
8        TransactionSimulationDetails, TransactionStatus,
9    },
10    solana_client::connection_cache::ConnectionCache,
11    solana_feature_set::{move_precompile_verification_to_svm, FeatureSet},
12    solana_runtime::{
13        bank::{Bank, TransactionSimulationResult},
14        bank_forks::BankForks,
15        commitment::BlockCommitmentCache,
16        verify_precompiles::verify_precompiles,
17    },
18    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
19    solana_sdk::{
20        account::Account,
21        clock::Slot,
22        commitment_config::CommitmentLevel,
23        hash::Hash,
24        message::{Message, SanitizedMessage},
25        pubkey::Pubkey,
26        signature::Signature,
27        transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction},
28    },
29    solana_send_transaction_service::{
30        send_transaction_service::{SendTransactionService, TransactionInfo},
31        tpu_info::NullTpuInfo,
32    },
33    std::{
34        io,
35        net::{Ipv4Addr, SocketAddr},
36        sync::{atomic::AtomicBool, Arc, RwLock},
37        thread::Builder,
38        time::Duration,
39    },
40    tarpc::{
41        context::Context,
42        serde_transport::tcp,
43        server::{self, incoming::Incoming, Channel},
44        transport::{self, channel::UnboundedChannel},
45        ClientMessage, Response,
46    },
47    tokio::time::sleep,
48    tokio_serde::formats::Bincode,
49};
50
51#[derive(Clone)]
52struct BanksServer {
53    bank_forks: Arc<RwLock<BankForks>>,
54    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
55    transaction_sender: Sender<TransactionInfo>,
56    poll_signature_status_sleep_duration: Duration,
57}
58
59impl BanksServer {
60    /// Return a BanksServer that forwards transactions to the
61    /// given sender. If unit-testing, those transactions can go to
62    /// a bank in the given BankForks. Otherwise, the receiver should
63    /// forward them to a validator in the leader schedule.
64    fn new(
65        bank_forks: Arc<RwLock<BankForks>>,
66        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
67        transaction_sender: Sender<TransactionInfo>,
68        poll_signature_status_sleep_duration: Duration,
69    ) -> Self {
70        Self {
71            bank_forks,
72            block_commitment_cache,
73            transaction_sender,
74            poll_signature_status_sleep_duration,
75        }
76    }
77
78    fn run(bank_forks: Arc<RwLock<BankForks>>, transaction_receiver: Receiver<TransactionInfo>) {
79        while let Ok(info) = transaction_receiver.recv() {
80            let mut transaction_infos = vec![info];
81            while let Ok(info) = transaction_receiver.try_recv() {
82                transaction_infos.push(info);
83            }
84            let transactions: Vec<_> = transaction_infos
85                .into_iter()
86                .map(|info| deserialize(&info.wire_transaction).unwrap())
87                .collect();
88            loop {
89                let bank = bank_forks.read().unwrap().working_bank();
90                // bank forks lock released, now verify bank hasn't been frozen yet
91                // in the mean-time the bank can not be frozen until this tx batch
92                // has been processed
93                let lock = bank.freeze_lock();
94                if *lock == Hash::default() {
95                    let _ = bank.try_process_entry_transactions(transactions);
96                    // break out of inner loop and release bank freeze lock
97                    break;
98                }
99            }
100        }
101    }
102
103    /// Useful for unit-testing
104    fn new_loopback(
105        bank_forks: Arc<RwLock<BankForks>>,
106        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
107        poll_signature_status_sleep_duration: Duration,
108    ) -> Self {
109        let (transaction_sender, transaction_receiver) = unbounded();
110        let bank = bank_forks.read().unwrap().working_bank();
111        let slot = bank.slot();
112        {
113            // ensure that the commitment cache and bank are synced
114            let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
115            w_block_commitment_cache.set_all_slots(slot, slot);
116        }
117        let server_bank_forks = bank_forks.clone();
118        Builder::new()
119            .name("solBankForksCli".to_string())
120            .spawn(move || Self::run(server_bank_forks, transaction_receiver))
121            .unwrap();
122        Self::new(
123            bank_forks,
124            block_commitment_cache,
125            transaction_sender,
126            poll_signature_status_sleep_duration,
127        )
128    }
129
130    fn slot(&self, commitment: CommitmentLevel) -> Slot {
131        self.block_commitment_cache
132            .read()
133            .unwrap()
134            .slot_with_commitment(commitment)
135    }
136
137    fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
138        self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
139    }
140
141    async fn poll_signature_status(
142        self,
143        signature: &Signature,
144        blockhash: &Hash,
145        last_valid_block_height: u64,
146        commitment: CommitmentLevel,
147    ) -> Option<transaction::Result<()>> {
148        let mut status = self
149            .bank(commitment)
150            .get_signature_status_with_blockhash(signature, blockhash);
151        while status.is_none() {
152            sleep(self.poll_signature_status_sleep_duration).await;
153            let bank = self.bank(commitment);
154            if bank.block_height() > last_valid_block_height {
155                break;
156            }
157            status = bank.get_signature_status_with_blockhash(signature, blockhash);
158        }
159        status
160    }
161}
162
163fn verify_transaction(
164    transaction: &SanitizedTransaction,
165    feature_set: &Arc<FeatureSet>,
166) -> transaction::Result<()> {
167    transaction.verify()?;
168
169    let move_precompile_verification_to_svm =
170        feature_set.is_active(&move_precompile_verification_to_svm::id());
171    if !move_precompile_verification_to_svm {
172        verify_precompiles(transaction, feature_set)?;
173    }
174
175    Ok(())
176}
177
178fn simulate_transaction(
179    bank: &Bank,
180    transaction: VersionedTransaction,
181) -> BanksTransactionResultWithSimulation {
182    let sanitized_transaction = match RuntimeTransaction::try_create(
183        transaction,
184        MessageHash::Compute,
185        Some(false), // is_simple_vote_tx
186        bank,
187        bank.get_reserved_account_keys(),
188    ) {
189        Err(err) => {
190            return BanksTransactionResultWithSimulation {
191                result: Some(Err(err)),
192                simulation_details: None,
193            };
194        }
195        Ok(tx) => tx,
196    };
197    let TransactionSimulationResult {
198        result,
199        logs,
200        post_simulation_accounts: _,
201        units_consumed,
202        return_data,
203        inner_instructions,
204    } = bank.simulate_transaction_unchecked(&sanitized_transaction, true);
205
206    let simulation_details = TransactionSimulationDetails {
207        logs,
208        units_consumed,
209        return_data,
210        inner_instructions,
211    };
212    BanksTransactionResultWithSimulation {
213        result: Some(result),
214        simulation_details: Some(simulation_details),
215    }
216}
217
218#[tarpc::server]
219impl Banks for BanksServer {
220    async fn send_transaction_with_context(self, _: Context, transaction: VersionedTransaction) {
221        let blockhash = transaction.message.recent_blockhash();
222        let last_valid_block_height = self
223            .bank_forks
224            .read()
225            .unwrap()
226            .root_bank()
227            .get_blockhash_last_valid_block_height(blockhash)
228            .unwrap();
229        let signature = transaction.signatures.first().cloned().unwrap_or_default();
230        let info = TransactionInfo::new(
231            signature,
232            serialize(&transaction).unwrap(),
233            last_valid_block_height,
234            None,
235            None,
236            None,
237        );
238        self.transaction_sender.send(info).unwrap();
239    }
240
241    async fn get_transaction_status_with_context(
242        self,
243        _: Context,
244        signature: Signature,
245    ) -> Option<TransactionStatus> {
246        let bank = self.bank(CommitmentLevel::Processed);
247        let (slot, status) = bank.get_signature_status_slot(&signature)?;
248        let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
249
250        let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed);
251        let optimistically_confirmed =
252            optimistically_confirmed_bank.get_signature_status_slot(&signature);
253
254        let confirmations = if r_block_commitment_cache.root() >= slot
255            && r_block_commitment_cache.highest_super_majority_root() >= slot
256        {
257            None
258        } else {
259            r_block_commitment_cache
260                .get_confirmation_count(slot)
261                .or(Some(0))
262        };
263        Some(TransactionStatus {
264            slot,
265            confirmations,
266            err: status.err(),
267            confirmation_status: if confirmations.is_none() {
268                Some(TransactionConfirmationStatus::Finalized)
269            } else if optimistically_confirmed.is_some() {
270                Some(TransactionConfirmationStatus::Confirmed)
271            } else {
272                Some(TransactionConfirmationStatus::Processed)
273            },
274        })
275    }
276
277    async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
278        self.slot(commitment)
279    }
280
281    async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 {
282        self.bank(commitment).block_height()
283    }
284
285    async fn process_transaction_with_preflight_and_commitment_and_context(
286        self,
287        ctx: Context,
288        transaction: VersionedTransaction,
289        commitment: CommitmentLevel,
290    ) -> BanksTransactionResultWithSimulation {
291        let mut simulation_result =
292            simulate_transaction(&self.bank(commitment), transaction.clone());
293        // Simulation was ok, so process the real transaction and replace the
294        // simulation's result with the real transaction result
295        if let Some(Ok(_)) = simulation_result.result {
296            simulation_result.result = self
297                .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
298                .await;
299        }
300        simulation_result
301    }
302
303    async fn simulate_transaction_with_commitment_and_context(
304        self,
305        _: Context,
306        transaction: VersionedTransaction,
307        commitment: CommitmentLevel,
308    ) -> BanksTransactionResultWithSimulation {
309        simulate_transaction(&self.bank(commitment), transaction)
310    }
311
312    async fn process_transaction_with_commitment_and_context(
313        self,
314        _: Context,
315        transaction: VersionedTransaction,
316        commitment: CommitmentLevel,
317    ) -> Option<transaction::Result<()>> {
318        let bank = self.bank(commitment);
319        let sanitized_transaction = match SanitizedTransaction::try_create(
320            transaction.clone(),
321            MessageHash::Compute,
322            Some(false), // is_simple_vote_tx
323            bank.as_ref(),
324            bank.get_reserved_account_keys(),
325        ) {
326            Ok(tx) => tx,
327            Err(err) => return Some(Err(err)),
328        };
329
330        if let Err(err) = verify_transaction(&sanitized_transaction, &bank.feature_set) {
331            return Some(Err(err));
332        }
333
334        let blockhash = transaction.message.recent_blockhash();
335        let last_valid_block_height = self
336            .bank(commitment)
337            .get_blockhash_last_valid_block_height(blockhash)
338            .unwrap();
339        let signature = sanitized_transaction.signature();
340        let info = TransactionInfo::new(
341            *signature,
342            serialize(&transaction).unwrap(),
343            last_valid_block_height,
344            None,
345            None,
346            None,
347        );
348        self.transaction_sender.send(info).unwrap();
349        self.poll_signature_status(signature, blockhash, last_valid_block_height, commitment)
350            .await
351    }
352
353    async fn process_transaction_with_metadata_and_context(
354        self,
355        _: Context,
356        transaction: VersionedTransaction,
357    ) -> BanksTransactionResultWithMetadata {
358        let bank = self.bank_forks.read().unwrap().working_bank();
359        match bank.process_transaction_with_metadata(transaction) {
360            Err(error) => BanksTransactionResultWithMetadata {
361                result: Err(error),
362                metadata: None,
363            },
364            Ok(details) => BanksTransactionResultWithMetadata {
365                result: details.status,
366                metadata: Some(TransactionMetadata {
367                    compute_units_consumed: details.executed_units,
368                    log_messages: details.log_messages.unwrap_or_default(),
369                    return_data: details.return_data,
370                }),
371            },
372        }
373    }
374
375    async fn get_account_with_commitment_and_context(
376        self,
377        _: Context,
378        address: Pubkey,
379        commitment: CommitmentLevel,
380    ) -> Option<Account> {
381        let bank = self.bank(commitment);
382        bank.get_account(&address).map(Account::from)
383    }
384
385    async fn get_latest_blockhash_with_context(self, _: Context) -> Hash {
386        let bank = self.bank(CommitmentLevel::default());
387        bank.last_blockhash()
388    }
389
390    async fn get_latest_blockhash_with_commitment_and_context(
391        self,
392        _: Context,
393        commitment: CommitmentLevel,
394    ) -> Option<(Hash, u64)> {
395        let bank = self.bank(commitment);
396        let blockhash = bank.last_blockhash();
397        let last_valid_block_height = bank.get_blockhash_last_valid_block_height(&blockhash)?;
398        Some((blockhash, last_valid_block_height))
399    }
400
401    async fn get_fee_for_message_with_commitment_and_context(
402        self,
403        _: Context,
404        message: Message,
405        commitment: CommitmentLevel,
406    ) -> Option<u64> {
407        let bank = self.bank(commitment);
408        let sanitized_message =
409            SanitizedMessage::try_from_legacy_message(message, bank.get_reserved_account_keys())
410                .ok()?;
411        bank.get_fee_for_message(&sanitized_message)
412    }
413}
414
415pub async fn start_local_server(
416    bank_forks: Arc<RwLock<BankForks>>,
417    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
418    poll_signature_status_sleep_duration: Duration,
419) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
420    let banks_server = BanksServer::new_loopback(
421        bank_forks,
422        block_commitment_cache,
423        poll_signature_status_sleep_duration,
424    );
425    let (client_transport, server_transport) = transport::channel::unbounded();
426    let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve());
427    tokio::spawn(server);
428    client_transport
429}
430
431pub async fn start_tcp_server(
432    listen_addr: SocketAddr,
433    tpu_addr: SocketAddr,
434    bank_forks: Arc<RwLock<BankForks>>,
435    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
436    connection_cache: Arc<ConnectionCache>,
437    exit: Arc<AtomicBool>,
438) -> io::Result<()> {
439    // Note: These settings are copied straight from the tarpc example.
440    let server = tcp::listen(listen_addr, Bincode::default)
441        .await?
442        // Ignore accept errors.
443        .filter_map(|r| future::ready(r.ok()))
444        .map(server::BaseChannel::with_defaults)
445        // Limit channels to 1 per IP.
446        .max_channels_per_key(1, |t| {
447            t.as_ref()
448                .peer_addr()
449                .map(|x| x.ip())
450                .unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into())
451        })
452        // serve is generated by the service attribute. It takes as input any type implementing
453        // the generated Banks trait.
454        .map(move |chan| {
455            let (sender, receiver) = unbounded();
456
457            SendTransactionService::new::<NullTpuInfo>(
458                tpu_addr,
459                &bank_forks,
460                None,
461                receiver,
462                &connection_cache,
463                5_000,
464                0,
465                exit.clone(),
466            );
467
468            let server = BanksServer::new(
469                bank_forks.clone(),
470                block_commitment_cache.clone(),
471                sender,
472                Duration::from_millis(200),
473            );
474            chan.execute(server.serve())
475        })
476        // Max 10 channels.
477        .buffer_unordered(10)
478        .for_each(|_| async {});
479
480    server.await;
481    Ok(())
482}