solana_banks_server/
banks_server.rs

1use {
2    bincode::{deserialize, serialize},
3    crossbeam_channel::{unbounded, Receiver, Sender},
4    futures::{future, prelude::stream::StreamExt},
5    solana_banks_interface::{
6        Banks, BanksRequest, BanksResponse, BanksTransactionResultWithMetadata,
7        BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionMetadata,
8        TransactionSimulationDetails, TransactionStatus,
9    },
10    solana_client::connection_cache::ConnectionCache,
11    solana_feature_set::{move_precompile_verification_to_svm, FeatureSet},
12    solana_runtime::{
13        bank::{Bank, TransactionSimulationResult},
14        bank_forks::BankForks,
15        commitment::BlockCommitmentCache,
16        verify_precompiles::verify_precompiles,
17    },
18    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
19    solana_sdk::{
20        account::Account,
21        clock::Slot,
22        commitment_config::CommitmentLevel,
23        hash::Hash,
24        message::{Message, SanitizedMessage},
25        pubkey::Pubkey,
26        signature::Signature,
27        transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction},
28    },
29    solana_send_transaction_service::{
30        send_transaction_service::{Config, SendTransactionService, TransactionInfo},
31        tpu_info::NullTpuInfo,
32        transaction_client::ConnectionCacheClient,
33    },
34    std::{
35        io,
36        net::{Ipv4Addr, SocketAddr},
37        sync::{atomic::AtomicBool, Arc, RwLock},
38        thread::Builder,
39        time::Duration,
40    },
41    tarpc::{
42        context::Context,
43        serde_transport::tcp,
44        server::{self, incoming::Incoming, Channel},
45        transport::{self, channel::UnboundedChannel},
46        ClientMessage, Response,
47    },
48    tokio::time::sleep,
49    tokio_serde::formats::Bincode,
50};
51
52#[derive(Clone)]
53struct BanksServer {
54    bank_forks: Arc<RwLock<BankForks>>,
55    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
56    transaction_sender: Sender<TransactionInfo>,
57    poll_signature_status_sleep_duration: Duration,
58}
59
60impl BanksServer {
61    /// Return a BanksServer that forwards transactions to the
62    /// given sender. If unit-testing, those transactions can go to
63    /// a bank in the given BankForks. Otherwise, the receiver should
64    /// forward them to a validator in the leader schedule.
65    fn new(
66        bank_forks: Arc<RwLock<BankForks>>,
67        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
68        transaction_sender: Sender<TransactionInfo>,
69        poll_signature_status_sleep_duration: Duration,
70    ) -> Self {
71        Self {
72            bank_forks,
73            block_commitment_cache,
74            transaction_sender,
75            poll_signature_status_sleep_duration,
76        }
77    }
78
79    fn run(bank_forks: Arc<RwLock<BankForks>>, transaction_receiver: Receiver<TransactionInfo>) {
80        while let Ok(info) = transaction_receiver.recv() {
81            let mut transaction_infos = vec![info];
82            while let Ok(info) = transaction_receiver.try_recv() {
83                transaction_infos.push(info);
84            }
85            let transactions: Vec<_> = transaction_infos
86                .into_iter()
87                .map(|info| deserialize(&info.wire_transaction).unwrap())
88                .collect();
89            loop {
90                let bank = bank_forks.read().unwrap().working_bank();
91                // bank forks lock released, now verify bank hasn't been frozen yet
92                // in the mean-time the bank can not be frozen until this tx batch
93                // has been processed
94                let lock = bank.freeze_lock();
95                if *lock == Hash::default() {
96                    let _ = bank.try_process_entry_transactions(transactions);
97                    // break out of inner loop and release bank freeze lock
98                    break;
99                }
100            }
101        }
102    }
103
104    /// Useful for unit-testing
105    fn new_loopback(
106        bank_forks: Arc<RwLock<BankForks>>,
107        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
108        poll_signature_status_sleep_duration: Duration,
109    ) -> Self {
110        let (transaction_sender, transaction_receiver) = unbounded();
111        let bank = bank_forks.read().unwrap().working_bank();
112        let slot = bank.slot();
113        {
114            // ensure that the commitment cache and bank are synced
115            let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
116            w_block_commitment_cache.set_all_slots(slot, slot);
117        }
118        let server_bank_forks = bank_forks.clone();
119        Builder::new()
120            .name("solBankForksCli".to_string())
121            .spawn(move || Self::run(server_bank_forks, transaction_receiver))
122            .unwrap();
123        Self::new(
124            bank_forks,
125            block_commitment_cache,
126            transaction_sender,
127            poll_signature_status_sleep_duration,
128        )
129    }
130
131    fn slot(&self, commitment: CommitmentLevel) -> Slot {
132        self.block_commitment_cache
133            .read()
134            .unwrap()
135            .slot_with_commitment(commitment)
136    }
137
138    fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
139        self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
140    }
141
142    async fn poll_signature_status(
143        self,
144        signature: &Signature,
145        blockhash: &Hash,
146        last_valid_block_height: u64,
147        commitment: CommitmentLevel,
148    ) -> Option<transaction::Result<()>> {
149        let mut status = self
150            .bank(commitment)
151            .get_signature_status_with_blockhash(signature, blockhash);
152        while status.is_none() {
153            sleep(self.poll_signature_status_sleep_duration).await;
154            let bank = self.bank(commitment);
155            if bank.block_height() > last_valid_block_height {
156                break;
157            }
158            status = bank.get_signature_status_with_blockhash(signature, blockhash);
159        }
160        status
161    }
162}
163
164fn verify_transaction(
165    transaction: &SanitizedTransaction,
166    feature_set: &Arc<FeatureSet>,
167) -> transaction::Result<()> {
168    transaction.verify()?;
169
170    let move_precompile_verification_to_svm =
171        feature_set.is_active(&move_precompile_verification_to_svm::id());
172    if !move_precompile_verification_to_svm {
173        verify_precompiles(transaction, feature_set)?;
174    }
175
176    Ok(())
177}
178
179fn simulate_transaction(
180    bank: &Bank,
181    transaction: VersionedTransaction,
182) -> BanksTransactionResultWithSimulation {
183    let sanitized_transaction = match RuntimeTransaction::try_create(
184        transaction,
185        MessageHash::Compute,
186        Some(false), // is_simple_vote_tx
187        bank,
188        bank.get_reserved_account_keys(),
189    ) {
190        Err(err) => {
191            return BanksTransactionResultWithSimulation {
192                result: Some(Err(err)),
193                simulation_details: None,
194            };
195        }
196        Ok(tx) => tx,
197    };
198    let TransactionSimulationResult {
199        result,
200        logs,
201        post_simulation_accounts: _,
202        units_consumed,
203        return_data,
204        inner_instructions,
205    } = bank.simulate_transaction_unchecked(&sanitized_transaction, true);
206
207    let simulation_details = TransactionSimulationDetails {
208        logs,
209        units_consumed,
210        return_data,
211        inner_instructions,
212    };
213    BanksTransactionResultWithSimulation {
214        result: Some(result),
215        simulation_details: Some(simulation_details),
216    }
217}
218
219#[tarpc::server]
220impl Banks for BanksServer {
221    async fn send_transaction_with_context(self, _: Context, transaction: VersionedTransaction) {
222        let blockhash = transaction.message.recent_blockhash();
223        let last_valid_block_height = self
224            .bank_forks
225            .read()
226            .unwrap()
227            .root_bank()
228            .get_blockhash_last_valid_block_height(blockhash)
229            .unwrap();
230        let signature = transaction.signatures.first().cloned().unwrap_or_default();
231        let info = TransactionInfo::new(
232            signature,
233            serialize(&transaction).unwrap(),
234            last_valid_block_height,
235            None,
236            None,
237            None,
238        );
239        self.transaction_sender.send(info).unwrap();
240    }
241
242    async fn get_transaction_status_with_context(
243        self,
244        _: Context,
245        signature: Signature,
246    ) -> Option<TransactionStatus> {
247        let bank = self.bank(CommitmentLevel::Processed);
248        let (slot, status) = bank.get_signature_status_slot(&signature)?;
249        let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
250
251        let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed);
252        let optimistically_confirmed =
253            optimistically_confirmed_bank.get_signature_status_slot(&signature);
254
255        let confirmations = if r_block_commitment_cache.root() >= slot
256            && r_block_commitment_cache.highest_super_majority_root() >= slot
257        {
258            None
259        } else {
260            r_block_commitment_cache
261                .get_confirmation_count(slot)
262                .or(Some(0))
263        };
264        Some(TransactionStatus {
265            slot,
266            confirmations,
267            err: status.err(),
268            confirmation_status: if confirmations.is_none() {
269                Some(TransactionConfirmationStatus::Finalized)
270            } else if optimistically_confirmed.is_some() {
271                Some(TransactionConfirmationStatus::Confirmed)
272            } else {
273                Some(TransactionConfirmationStatus::Processed)
274            },
275        })
276    }
277
278    async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
279        self.slot(commitment)
280    }
281
282    async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 {
283        self.bank(commitment).block_height()
284    }
285
286    async fn process_transaction_with_preflight_and_commitment_and_context(
287        self,
288        ctx: Context,
289        transaction: VersionedTransaction,
290        commitment: CommitmentLevel,
291    ) -> BanksTransactionResultWithSimulation {
292        let mut simulation_result =
293            simulate_transaction(&self.bank(commitment), transaction.clone());
294        // Simulation was ok, so process the real transaction and replace the
295        // simulation's result with the real transaction result
296        if let Some(Ok(_)) = simulation_result.result {
297            simulation_result.result = self
298                .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
299                .await;
300        }
301        simulation_result
302    }
303
304    async fn simulate_transaction_with_commitment_and_context(
305        self,
306        _: Context,
307        transaction: VersionedTransaction,
308        commitment: CommitmentLevel,
309    ) -> BanksTransactionResultWithSimulation {
310        simulate_transaction(&self.bank(commitment), transaction)
311    }
312
313    async fn process_transaction_with_commitment_and_context(
314        self,
315        _: Context,
316        transaction: VersionedTransaction,
317        commitment: CommitmentLevel,
318    ) -> Option<transaction::Result<()>> {
319        let bank = self.bank(commitment);
320        let sanitized_transaction = match SanitizedTransaction::try_create(
321            transaction.clone(),
322            MessageHash::Compute,
323            Some(false), // is_simple_vote_tx
324            bank.as_ref(),
325            bank.get_reserved_account_keys(),
326        ) {
327            Ok(tx) => tx,
328            Err(err) => return Some(Err(err)),
329        };
330
331        if let Err(err) = verify_transaction(&sanitized_transaction, &bank.feature_set) {
332            return Some(Err(err));
333        }
334
335        let blockhash = transaction.message.recent_blockhash();
336        let last_valid_block_height = self
337            .bank(commitment)
338            .get_blockhash_last_valid_block_height(blockhash)
339            .unwrap();
340        let signature = sanitized_transaction.signature();
341        let info = TransactionInfo::new(
342            *signature,
343            serialize(&transaction).unwrap(),
344            last_valid_block_height,
345            None,
346            None,
347            None,
348        );
349        self.transaction_sender.send(info).unwrap();
350        self.poll_signature_status(signature, blockhash, last_valid_block_height, commitment)
351            .await
352    }
353
354    async fn process_transaction_with_metadata_and_context(
355        self,
356        _: Context,
357        transaction: VersionedTransaction,
358    ) -> BanksTransactionResultWithMetadata {
359        let bank = self.bank_forks.read().unwrap().working_bank();
360        match bank.process_transaction_with_metadata(transaction) {
361            Err(error) => BanksTransactionResultWithMetadata {
362                result: Err(error),
363                metadata: None,
364            },
365            Ok(details) => BanksTransactionResultWithMetadata {
366                result: details.status,
367                metadata: Some(TransactionMetadata {
368                    compute_units_consumed: details.executed_units,
369                    log_messages: details.log_messages.unwrap_or_default(),
370                    return_data: details.return_data,
371                }),
372            },
373        }
374    }
375
376    async fn get_account_with_commitment_and_context(
377        self,
378        _: Context,
379        address: Pubkey,
380        commitment: CommitmentLevel,
381    ) -> Option<Account> {
382        let bank = self.bank(commitment);
383        bank.get_account(&address).map(Account::from)
384    }
385
386    async fn get_latest_blockhash_with_context(self, _: Context) -> Hash {
387        let bank = self.bank(CommitmentLevel::default());
388        bank.last_blockhash()
389    }
390
391    async fn get_latest_blockhash_with_commitment_and_context(
392        self,
393        _: Context,
394        commitment: CommitmentLevel,
395    ) -> Option<(Hash, u64)> {
396        let bank = self.bank(commitment);
397        let blockhash = bank.last_blockhash();
398        let last_valid_block_height = bank.get_blockhash_last_valid_block_height(&blockhash)?;
399        Some((blockhash, last_valid_block_height))
400    }
401
402    async fn get_fee_for_message_with_commitment_and_context(
403        self,
404        _: Context,
405        message: Message,
406        commitment: CommitmentLevel,
407    ) -> Option<u64> {
408        let bank = self.bank(commitment);
409        let sanitized_message =
410            SanitizedMessage::try_from_legacy_message(message, bank.get_reserved_account_keys())
411                .ok()?;
412        bank.get_fee_for_message(&sanitized_message)
413    }
414}
415
416pub async fn start_local_server(
417    bank_forks: Arc<RwLock<BankForks>>,
418    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
419    poll_signature_status_sleep_duration: Duration,
420) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
421    let banks_server = BanksServer::new_loopback(
422        bank_forks,
423        block_commitment_cache,
424        poll_signature_status_sleep_duration,
425    );
426    let (client_transport, server_transport) = transport::channel::unbounded();
427    let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve());
428    tokio::spawn(server);
429    client_transport
430}
431
432pub async fn start_tcp_server(
433    listen_addr: SocketAddr,
434    tpu_addr: SocketAddr,
435    bank_forks: Arc<RwLock<BankForks>>,
436    block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
437    connection_cache: Arc<ConnectionCache>,
438    exit: Arc<AtomicBool>,
439) -> io::Result<()> {
440    // Note: These settings are copied straight from the tarpc example.
441    let server = tcp::listen(listen_addr, Bincode::default)
442        .await?
443        // Ignore accept errors.
444        .filter_map(|r| future::ready(r.ok()))
445        .map(server::BaseChannel::with_defaults)
446        // Limit channels to 1 per IP.
447        .max_channels_per_key(1, |t| {
448            t.as_ref()
449                .peer_addr()
450                .map(|x| x.ip())
451                .unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into())
452        })
453        // serve is generated by the service attribute. It takes as input any type implementing
454        // the generated Banks trait.
455        .map(move |chan| {
456            let (sender, receiver) = unbounded();
457
458            let client = ConnectionCacheClient::<NullTpuInfo>::new(
459                connection_cache.clone(),
460                tpu_addr,
461                None,
462                None,
463                0,
464            );
465
466            SendTransactionService::new_with_client(
467                &bank_forks,
468                receiver,
469                client,
470                Config {
471                    retry_rate_ms: 5_000,
472                    ..Config::default()
473                },
474                exit.clone(),
475            );
476
477            let server = BanksServer::new(
478                bank_forks.clone(),
479                block_commitment_cache.clone(),
480                sender,
481                Duration::from_millis(200),
482            );
483            chan.execute(server.serve())
484        })
485        // Max 10 channels.
486        .buffer_unordered(10)
487        .for_each(|_| async {});
488
489    server.await;
490    Ok(())
491}