1use {
2 bincode::{deserialize, serialize},
3 crossbeam_channel::{unbounded, Receiver, Sender},
4 futures::{future, prelude::stream::StreamExt},
5 solana_banks_interface::{
6 Banks, BanksRequest, BanksResponse, BanksTransactionResultWithMetadata,
7 BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionMetadata,
8 TransactionSimulationDetails, TransactionStatus,
9 },
10 solana_client::connection_cache::ConnectionCache,
11 solana_feature_set::{move_precompile_verification_to_svm, FeatureSet},
12 solana_runtime::{
13 bank::{Bank, TransactionSimulationResult},
14 bank_forks::BankForks,
15 commitment::BlockCommitmentCache,
16 verify_precompiles::verify_precompiles,
17 },
18 solana_sdk::{
19 account::Account,
20 clock::Slot,
21 commitment_config::CommitmentLevel,
22 hash::Hash,
23 message::{Message, SanitizedMessage},
24 pubkey::Pubkey,
25 signature::Signature,
26 transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction},
27 },
28 solana_send_transaction_service::{
29 send_transaction_service::{SendTransactionService, TransactionInfo},
30 tpu_info::NullTpuInfo,
31 },
32 std::{
33 io,
34 net::{Ipv4Addr, SocketAddr},
35 sync::{atomic::AtomicBool, Arc, RwLock},
36 thread::Builder,
37 time::Duration,
38 },
39 tarpc::{
40 context::Context,
41 serde_transport::tcp,
42 server::{self, incoming::Incoming, Channel},
43 transport::{self, channel::UnboundedChannel},
44 ClientMessage, Response,
45 },
46 tokio::time::sleep,
47 tokio_serde::formats::Bincode,
48};
49
50#[derive(Clone)]
51struct BanksServer {
52 bank_forks: Arc<RwLock<BankForks>>,
53 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
54 transaction_sender: Sender<TransactionInfo>,
55 poll_signature_status_sleep_duration: Duration,
56}
57
58impl BanksServer {
59 fn new(
64 bank_forks: Arc<RwLock<BankForks>>,
65 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
66 transaction_sender: Sender<TransactionInfo>,
67 poll_signature_status_sleep_duration: Duration,
68 ) -> Self {
69 Self {
70 bank_forks,
71 block_commitment_cache,
72 transaction_sender,
73 poll_signature_status_sleep_duration,
74 }
75 }
76
77 fn run(bank_forks: Arc<RwLock<BankForks>>, transaction_receiver: Receiver<TransactionInfo>) {
78 while let Ok(info) = transaction_receiver.recv() {
79 let mut transaction_infos = vec![info];
80 while let Ok(info) = transaction_receiver.try_recv() {
81 transaction_infos.push(info);
82 }
83 let transactions: Vec<_> = transaction_infos
84 .into_iter()
85 .map(|info| deserialize(&info.wire_transaction).unwrap())
86 .collect();
87 loop {
88 let bank = bank_forks.read().unwrap().working_bank();
89 let lock = bank.freeze_lock();
93 if *lock == Hash::default() {
94 let _ = bank.try_process_entry_transactions(transactions);
95 break;
97 }
98 }
99 }
100 }
101
102 fn new_loopback(
104 bank_forks: Arc<RwLock<BankForks>>,
105 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
106 poll_signature_status_sleep_duration: Duration,
107 ) -> Self {
108 let (transaction_sender, transaction_receiver) = unbounded();
109 let bank = bank_forks.read().unwrap().working_bank();
110 let slot = bank.slot();
111 {
112 let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
114 w_block_commitment_cache.set_all_slots(slot, slot);
115 }
116 let server_bank_forks = bank_forks.clone();
117 Builder::new()
118 .name("solBankForksCli".to_string())
119 .spawn(move || Self::run(server_bank_forks, transaction_receiver))
120 .unwrap();
121 Self::new(
122 bank_forks,
123 block_commitment_cache,
124 transaction_sender,
125 poll_signature_status_sleep_duration,
126 )
127 }
128
129 fn slot(&self, commitment: CommitmentLevel) -> Slot {
130 self.block_commitment_cache
131 .read()
132 .unwrap()
133 .slot_with_commitment(commitment)
134 }
135
136 fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
137 self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
138 }
139
140 async fn poll_signature_status(
141 self,
142 signature: &Signature,
143 blockhash: &Hash,
144 last_valid_block_height: u64,
145 commitment: CommitmentLevel,
146 ) -> Option<transaction::Result<()>> {
147 let mut status = self
148 .bank(commitment)
149 .get_signature_status_with_blockhash(signature, blockhash);
150 while status.is_none() {
151 sleep(self.poll_signature_status_sleep_duration).await;
152 let bank = self.bank(commitment);
153 if bank.block_height() > last_valid_block_height {
154 break;
155 }
156 status = bank.get_signature_status_with_blockhash(signature, blockhash);
157 }
158 status
159 }
160}
161
162fn verify_transaction(
163 transaction: &SanitizedTransaction,
164 feature_set: &Arc<FeatureSet>,
165) -> transaction::Result<()> {
166 transaction.verify()?;
167
168 let move_precompile_verification_to_svm =
169 feature_set.is_active(&move_precompile_verification_to_svm::id());
170 if !move_precompile_verification_to_svm {
171 verify_precompiles(transaction, feature_set)?;
172 }
173
174 Ok(())
175}
176
177fn simulate_transaction(
178 bank: &Bank,
179 transaction: VersionedTransaction,
180) -> BanksTransactionResultWithSimulation {
181 let sanitized_transaction = match SanitizedTransaction::try_create(
182 transaction,
183 MessageHash::Compute,
184 Some(false), bank,
186 bank.get_reserved_account_keys(),
187 ) {
188 Err(err) => {
189 return BanksTransactionResultWithSimulation {
190 result: Some(Err(err)),
191 simulation_details: None,
192 };
193 }
194 Ok(tx) => tx,
195 };
196 let TransactionSimulationResult {
197 result,
198 logs,
199 post_simulation_accounts: _,
200 units_consumed,
201 return_data,
202 inner_instructions,
203 } = bank.simulate_transaction_unchecked(&sanitized_transaction, true);
204
205 let simulation_details = TransactionSimulationDetails {
206 logs,
207 units_consumed,
208 return_data,
209 inner_instructions,
210 };
211 BanksTransactionResultWithSimulation {
212 result: Some(result),
213 simulation_details: Some(simulation_details),
214 }
215}
216
217#[tarpc::server]
218impl Banks for BanksServer {
219 async fn send_transaction_with_context(self, _: Context, transaction: VersionedTransaction) {
220 let blockhash = transaction.message.recent_blockhash();
221 let last_valid_block_height = self
222 .bank_forks
223 .read()
224 .unwrap()
225 .root_bank()
226 .get_blockhash_last_valid_block_height(blockhash)
227 .unwrap();
228 let signature = transaction.signatures.first().cloned().unwrap_or_default();
229 let info = TransactionInfo::new(
230 signature,
231 serialize(&transaction).unwrap(),
232 last_valid_block_height,
233 None,
234 None,
235 None,
236 );
237 self.transaction_sender.send(info).unwrap();
238 }
239
240 async fn get_transaction_status_with_context(
241 self,
242 _: Context,
243 signature: Signature,
244 ) -> Option<TransactionStatus> {
245 let bank = self.bank(CommitmentLevel::Processed);
246 let (slot, status) = bank.get_signature_status_slot(&signature)?;
247 let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
248
249 let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed);
250 let optimistically_confirmed =
251 optimistically_confirmed_bank.get_signature_status_slot(&signature);
252
253 let confirmations = if r_block_commitment_cache.root() >= slot
254 && r_block_commitment_cache.highest_super_majority_root() >= slot
255 {
256 None
257 } else {
258 r_block_commitment_cache
259 .get_confirmation_count(slot)
260 .or(Some(0))
261 };
262 Some(TransactionStatus {
263 slot,
264 confirmations,
265 err: status.err(),
266 confirmation_status: if confirmations.is_none() {
267 Some(TransactionConfirmationStatus::Finalized)
268 } else if optimistically_confirmed.is_some() {
269 Some(TransactionConfirmationStatus::Confirmed)
270 } else {
271 Some(TransactionConfirmationStatus::Processed)
272 },
273 })
274 }
275
276 async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
277 self.slot(commitment)
278 }
279
280 async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 {
281 self.bank(commitment).block_height()
282 }
283
284 async fn process_transaction_with_preflight_and_commitment_and_context(
285 self,
286 ctx: Context,
287 transaction: VersionedTransaction,
288 commitment: CommitmentLevel,
289 ) -> BanksTransactionResultWithSimulation {
290 let mut simulation_result =
291 simulate_transaction(&self.bank(commitment), transaction.clone());
292 if let Some(Ok(_)) = simulation_result.result {
295 simulation_result.result = self
296 .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
297 .await;
298 }
299 simulation_result
300 }
301
302 async fn simulate_transaction_with_commitment_and_context(
303 self,
304 _: Context,
305 transaction: VersionedTransaction,
306 commitment: CommitmentLevel,
307 ) -> BanksTransactionResultWithSimulation {
308 simulate_transaction(&self.bank(commitment), transaction)
309 }
310
311 async fn process_transaction_with_commitment_and_context(
312 self,
313 _: Context,
314 transaction: VersionedTransaction,
315 commitment: CommitmentLevel,
316 ) -> Option<transaction::Result<()>> {
317 let bank = self.bank(commitment);
318 let sanitized_transaction = match SanitizedTransaction::try_create(
319 transaction.clone(),
320 MessageHash::Compute,
321 Some(false), bank.as_ref(),
323 bank.get_reserved_account_keys(),
324 ) {
325 Ok(tx) => tx,
326 Err(err) => return Some(Err(err)),
327 };
328
329 if let Err(err) = verify_transaction(&sanitized_transaction, &bank.feature_set) {
330 return Some(Err(err));
331 }
332
333 let blockhash = transaction.message.recent_blockhash();
334 let last_valid_block_height = self
335 .bank(commitment)
336 .get_blockhash_last_valid_block_height(blockhash)
337 .unwrap();
338 let signature = sanitized_transaction.signature();
339 let info = TransactionInfo::new(
340 *signature,
341 serialize(&transaction).unwrap(),
342 last_valid_block_height,
343 None,
344 None,
345 None,
346 );
347 self.transaction_sender.send(info).unwrap();
348 self.poll_signature_status(signature, blockhash, last_valid_block_height, commitment)
349 .await
350 }
351
352 async fn process_transaction_with_metadata_and_context(
353 self,
354 _: Context,
355 transaction: VersionedTransaction,
356 ) -> BanksTransactionResultWithMetadata {
357 let bank = self.bank_forks.read().unwrap().working_bank();
358 match bank.process_transaction_with_metadata(transaction) {
359 Err(error) => BanksTransactionResultWithMetadata {
360 result: Err(error),
361 metadata: None,
362 },
363 Ok(details) => BanksTransactionResultWithMetadata {
364 result: details.status,
365 metadata: Some(TransactionMetadata {
366 compute_units_consumed: details.executed_units,
367 log_messages: details.log_messages.unwrap_or_default(),
368 return_data: details.return_data,
369 }),
370 },
371 }
372 }
373
374 async fn get_account_with_commitment_and_context(
375 self,
376 _: Context,
377 address: Pubkey,
378 commitment: CommitmentLevel,
379 ) -> Option<Account> {
380 let bank = self.bank(commitment);
381 bank.get_account(&address).map(Account::from)
382 }
383
384 async fn get_latest_blockhash_with_context(self, _: Context) -> Hash {
385 let bank = self.bank(CommitmentLevel::default());
386 bank.last_blockhash()
387 }
388
389 async fn get_latest_blockhash_with_commitment_and_context(
390 self,
391 _: Context,
392 commitment: CommitmentLevel,
393 ) -> Option<(Hash, u64)> {
394 let bank = self.bank(commitment);
395 let blockhash = bank.last_blockhash();
396 let last_valid_block_height = bank.get_blockhash_last_valid_block_height(&blockhash)?;
397 Some((blockhash, last_valid_block_height))
398 }
399
400 async fn get_fee_for_message_with_commitment_and_context(
401 self,
402 _: Context,
403 message: Message,
404 commitment: CommitmentLevel,
405 ) -> Option<u64> {
406 let bank = self.bank(commitment);
407 let sanitized_message =
408 SanitizedMessage::try_from_legacy_message(message, bank.get_reserved_account_keys())
409 .ok()?;
410 bank.get_fee_for_message(&sanitized_message)
411 }
412}
413
414pub async fn start_local_server(
415 bank_forks: Arc<RwLock<BankForks>>,
416 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
417 poll_signature_status_sleep_duration: Duration,
418) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
419 let banks_server = BanksServer::new_loopback(
420 bank_forks,
421 block_commitment_cache,
422 poll_signature_status_sleep_duration,
423 );
424 let (client_transport, server_transport) = transport::channel::unbounded();
425 let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve());
426 tokio::spawn(server);
427 client_transport
428}
429
430pub async fn start_tcp_server(
431 listen_addr: SocketAddr,
432 tpu_addr: SocketAddr,
433 bank_forks: Arc<RwLock<BankForks>>,
434 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
435 connection_cache: Arc<ConnectionCache>,
436 exit: Arc<AtomicBool>,
437) -> io::Result<()> {
438 let server = tcp::listen(listen_addr, Bincode::default)
440 .await?
441 .filter_map(|r| future::ready(r.ok()))
443 .map(server::BaseChannel::with_defaults)
444 .max_channels_per_key(1, |t| {
446 t.as_ref()
447 .peer_addr()
448 .map(|x| x.ip())
449 .unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into())
450 })
451 .map(move |chan| {
454 let (sender, receiver) = unbounded();
455
456 SendTransactionService::new::<NullTpuInfo>(
457 tpu_addr,
458 &bank_forks,
459 None,
460 receiver,
461 &connection_cache,
462 5_000,
463 0,
464 exit.clone(),
465 );
466
467 let server = BanksServer::new(
468 bank_forks.clone(),
469 block_commitment_cache.clone(),
470 sender,
471 Duration::from_millis(200),
472 );
473 chan.execute(server.serve())
474 })
475 .buffer_unordered(10)
477 .for_each(|_| async {});
478
479 server.await;
480 Ok(())
481}