1use {
2 bincode::{deserialize, serialize},
3 crossbeam_channel::{unbounded, Receiver, Sender},
4 futures::{future, prelude::stream::StreamExt},
5 solana_banks_interface::{
6 Banks, BanksRequest, BanksResponse, BanksTransactionResultWithMetadata,
7 BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionMetadata,
8 TransactionSimulationDetails, TransactionStatus,
9 },
10 solana_client::connection_cache::ConnectionCache,
11 solana_feature_set::{move_precompile_verification_to_svm, FeatureSet},
12 solana_runtime::{
13 bank::{Bank, TransactionSimulationResult},
14 bank_forks::BankForks,
15 commitment::BlockCommitmentCache,
16 verify_precompiles::verify_precompiles,
17 },
18 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
19 solana_sdk::{
20 account::Account,
21 clock::Slot,
22 commitment_config::CommitmentLevel,
23 hash::Hash,
24 message::{Message, SanitizedMessage},
25 pubkey::Pubkey,
26 signature::Signature,
27 transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction},
28 },
29 solana_send_transaction_service::{
30 send_transaction_service::{Config, SendTransactionService, TransactionInfo},
31 tpu_info::NullTpuInfo,
32 transaction_client::ConnectionCacheClient,
33 },
34 std::{
35 io,
36 net::{Ipv4Addr, SocketAddr},
37 sync::{atomic::AtomicBool, Arc, RwLock},
38 thread::Builder,
39 time::Duration,
40 },
41 tarpc::{
42 context::Context,
43 serde_transport::tcp,
44 server::{self, incoming::Incoming, Channel},
45 transport::{self, channel::UnboundedChannel},
46 ClientMessage, Response,
47 },
48 tokio::time::sleep,
49 tokio_serde::formats::Bincode,
50};
51
52#[derive(Clone)]
53struct BanksServer {
54 bank_forks: Arc<RwLock<BankForks>>,
55 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
56 transaction_sender: Sender<TransactionInfo>,
57 poll_signature_status_sleep_duration: Duration,
58}
59
60impl BanksServer {
61 fn new(
66 bank_forks: Arc<RwLock<BankForks>>,
67 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
68 transaction_sender: Sender<TransactionInfo>,
69 poll_signature_status_sleep_duration: Duration,
70 ) -> Self {
71 Self {
72 bank_forks,
73 block_commitment_cache,
74 transaction_sender,
75 poll_signature_status_sleep_duration,
76 }
77 }
78
79 fn run(bank_forks: Arc<RwLock<BankForks>>, transaction_receiver: Receiver<TransactionInfo>) {
80 while let Ok(info) = transaction_receiver.recv() {
81 let mut transaction_infos = vec![info];
82 while let Ok(info) = transaction_receiver.try_recv() {
83 transaction_infos.push(info);
84 }
85 let transactions: Vec<_> = transaction_infos
86 .into_iter()
87 .map(|info| deserialize(&info.wire_transaction).unwrap())
88 .collect();
89 loop {
90 let bank = bank_forks.read().unwrap().working_bank();
91 let lock = bank.freeze_lock();
95 if *lock == Hash::default() {
96 let _ = bank.try_process_entry_transactions(transactions);
97 break;
99 }
100 }
101 }
102 }
103
104 fn new_loopback(
106 bank_forks: Arc<RwLock<BankForks>>,
107 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
108 poll_signature_status_sleep_duration: Duration,
109 ) -> Self {
110 let (transaction_sender, transaction_receiver) = unbounded();
111 let bank = bank_forks.read().unwrap().working_bank();
112 let slot = bank.slot();
113 {
114 let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
116 w_block_commitment_cache.set_all_slots(slot, slot);
117 }
118 let server_bank_forks = bank_forks.clone();
119 Builder::new()
120 .name("solBankForksCli".to_string())
121 .spawn(move || Self::run(server_bank_forks, transaction_receiver))
122 .unwrap();
123 Self::new(
124 bank_forks,
125 block_commitment_cache,
126 transaction_sender,
127 poll_signature_status_sleep_duration,
128 )
129 }
130
131 fn slot(&self, commitment: CommitmentLevel) -> Slot {
132 self.block_commitment_cache
133 .read()
134 .unwrap()
135 .slot_with_commitment(commitment)
136 }
137
138 fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
139 self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
140 }
141
142 async fn poll_signature_status(
143 self,
144 signature: &Signature,
145 blockhash: &Hash,
146 last_valid_block_height: u64,
147 commitment: CommitmentLevel,
148 ) -> Option<transaction::Result<()>> {
149 let mut status = self
150 .bank(commitment)
151 .get_signature_status_with_blockhash(signature, blockhash);
152 while status.is_none() {
153 sleep(self.poll_signature_status_sleep_duration).await;
154 let bank = self.bank(commitment);
155 if bank.block_height() > last_valid_block_height {
156 break;
157 }
158 status = bank.get_signature_status_with_blockhash(signature, blockhash);
159 }
160 status
161 }
162}
163
164fn verify_transaction(
165 transaction: &SanitizedTransaction,
166 feature_set: &Arc<FeatureSet>,
167) -> transaction::Result<()> {
168 transaction.verify()?;
169
170 let move_precompile_verification_to_svm =
171 feature_set.is_active(&move_precompile_verification_to_svm::id());
172 if !move_precompile_verification_to_svm {
173 verify_precompiles(transaction, feature_set)?;
174 }
175
176 Ok(())
177}
178
179fn simulate_transaction(
180 bank: &Bank,
181 transaction: VersionedTransaction,
182) -> BanksTransactionResultWithSimulation {
183 let sanitized_transaction = match RuntimeTransaction::try_create(
184 transaction,
185 MessageHash::Compute,
186 Some(false), bank,
188 bank.get_reserved_account_keys(),
189 ) {
190 Err(err) => {
191 return BanksTransactionResultWithSimulation {
192 result: Some(Err(err)),
193 simulation_details: None,
194 };
195 }
196 Ok(tx) => tx,
197 };
198 let TransactionSimulationResult {
199 result,
200 logs,
201 post_simulation_accounts: _,
202 units_consumed,
203 return_data,
204 inner_instructions,
205 } = bank.simulate_transaction_unchecked(&sanitized_transaction, true);
206
207 let simulation_details = TransactionSimulationDetails {
208 logs,
209 units_consumed,
210 return_data,
211 inner_instructions,
212 };
213 BanksTransactionResultWithSimulation {
214 result: Some(result),
215 simulation_details: Some(simulation_details),
216 }
217}
218
219#[tarpc::server]
220impl Banks for BanksServer {
221 async fn send_transaction_with_context(self, _: Context, transaction: VersionedTransaction) {
222 let blockhash = transaction.message.recent_blockhash();
223 let last_valid_block_height = self
224 .bank_forks
225 .read()
226 .unwrap()
227 .root_bank()
228 .get_blockhash_last_valid_block_height(blockhash)
229 .unwrap();
230 let signature = transaction.signatures.first().cloned().unwrap_or_default();
231 let info = TransactionInfo::new(
232 signature,
233 serialize(&transaction).unwrap(),
234 last_valid_block_height,
235 None,
236 None,
237 None,
238 );
239 self.transaction_sender.send(info).unwrap();
240 }
241
242 async fn get_transaction_status_with_context(
243 self,
244 _: Context,
245 signature: Signature,
246 ) -> Option<TransactionStatus> {
247 let bank = self.bank(CommitmentLevel::Processed);
248 let (slot, status) = bank.get_signature_status_slot(&signature)?;
249 let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
250
251 let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed);
252 let optimistically_confirmed =
253 optimistically_confirmed_bank.get_signature_status_slot(&signature);
254
255 let confirmations = if r_block_commitment_cache.root() >= slot
256 && r_block_commitment_cache.highest_super_majority_root() >= slot
257 {
258 None
259 } else {
260 r_block_commitment_cache
261 .get_confirmation_count(slot)
262 .or(Some(0))
263 };
264 Some(TransactionStatus {
265 slot,
266 confirmations,
267 err: status.err(),
268 confirmation_status: if confirmations.is_none() {
269 Some(TransactionConfirmationStatus::Finalized)
270 } else if optimistically_confirmed.is_some() {
271 Some(TransactionConfirmationStatus::Confirmed)
272 } else {
273 Some(TransactionConfirmationStatus::Processed)
274 },
275 })
276 }
277
278 async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
279 self.slot(commitment)
280 }
281
282 async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 {
283 self.bank(commitment).block_height()
284 }
285
286 async fn process_transaction_with_preflight_and_commitment_and_context(
287 self,
288 ctx: Context,
289 transaction: VersionedTransaction,
290 commitment: CommitmentLevel,
291 ) -> BanksTransactionResultWithSimulation {
292 let mut simulation_result =
293 simulate_transaction(&self.bank(commitment), transaction.clone());
294 if let Some(Ok(_)) = simulation_result.result {
297 simulation_result.result = self
298 .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
299 .await;
300 }
301 simulation_result
302 }
303
304 async fn simulate_transaction_with_commitment_and_context(
305 self,
306 _: Context,
307 transaction: VersionedTransaction,
308 commitment: CommitmentLevel,
309 ) -> BanksTransactionResultWithSimulation {
310 simulate_transaction(&self.bank(commitment), transaction)
311 }
312
313 async fn process_transaction_with_commitment_and_context(
314 self,
315 _: Context,
316 transaction: VersionedTransaction,
317 commitment: CommitmentLevel,
318 ) -> Option<transaction::Result<()>> {
319 let bank = self.bank(commitment);
320 let sanitized_transaction = match SanitizedTransaction::try_create(
321 transaction.clone(),
322 MessageHash::Compute,
323 Some(false), bank.as_ref(),
325 bank.get_reserved_account_keys(),
326 ) {
327 Ok(tx) => tx,
328 Err(err) => return Some(Err(err)),
329 };
330
331 if let Err(err) = verify_transaction(&sanitized_transaction, &bank.feature_set) {
332 return Some(Err(err));
333 }
334
335 let blockhash = transaction.message.recent_blockhash();
336 let last_valid_block_height = self
337 .bank(commitment)
338 .get_blockhash_last_valid_block_height(blockhash)
339 .unwrap();
340 let signature = sanitized_transaction.signature();
341 let info = TransactionInfo::new(
342 *signature,
343 serialize(&transaction).unwrap(),
344 last_valid_block_height,
345 None,
346 None,
347 None,
348 );
349 self.transaction_sender.send(info).unwrap();
350 self.poll_signature_status(signature, blockhash, last_valid_block_height, commitment)
351 .await
352 }
353
354 async fn process_transaction_with_metadata_and_context(
355 self,
356 _: Context,
357 transaction: VersionedTransaction,
358 ) -> BanksTransactionResultWithMetadata {
359 let bank = self.bank_forks.read().unwrap().working_bank();
360 match bank.process_transaction_with_metadata(transaction) {
361 Err(error) => BanksTransactionResultWithMetadata {
362 result: Err(error),
363 metadata: None,
364 },
365 Ok(details) => BanksTransactionResultWithMetadata {
366 result: details.status,
367 metadata: Some(TransactionMetadata {
368 compute_units_consumed: details.executed_units,
369 log_messages: details.log_messages.unwrap_or_default(),
370 return_data: details.return_data,
371 }),
372 },
373 }
374 }
375
376 async fn get_account_with_commitment_and_context(
377 self,
378 _: Context,
379 address: Pubkey,
380 commitment: CommitmentLevel,
381 ) -> Option<Account> {
382 let bank = self.bank(commitment);
383 bank.get_account(&address).map(Account::from)
384 }
385
386 async fn get_latest_blockhash_with_context(self, _: Context) -> Hash {
387 let bank = self.bank(CommitmentLevel::default());
388 bank.last_blockhash()
389 }
390
391 async fn get_latest_blockhash_with_commitment_and_context(
392 self,
393 _: Context,
394 commitment: CommitmentLevel,
395 ) -> Option<(Hash, u64)> {
396 let bank = self.bank(commitment);
397 let blockhash = bank.last_blockhash();
398 let last_valid_block_height = bank.get_blockhash_last_valid_block_height(&blockhash)?;
399 Some((blockhash, last_valid_block_height))
400 }
401
402 async fn get_fee_for_message_with_commitment_and_context(
403 self,
404 _: Context,
405 message: Message,
406 commitment: CommitmentLevel,
407 ) -> Option<u64> {
408 let bank = self.bank(commitment);
409 let sanitized_message =
410 SanitizedMessage::try_from_legacy_message(message, bank.get_reserved_account_keys())
411 .ok()?;
412 bank.get_fee_for_message(&sanitized_message)
413 }
414}
415
416pub async fn start_local_server(
417 bank_forks: Arc<RwLock<BankForks>>,
418 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
419 poll_signature_status_sleep_duration: Duration,
420) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
421 let banks_server = BanksServer::new_loopback(
422 bank_forks,
423 block_commitment_cache,
424 poll_signature_status_sleep_duration,
425 );
426 let (client_transport, server_transport) = transport::channel::unbounded();
427 let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve());
428 tokio::spawn(server);
429 client_transport
430}
431
432pub async fn start_tcp_server(
433 listen_addr: SocketAddr,
434 tpu_addr: SocketAddr,
435 bank_forks: Arc<RwLock<BankForks>>,
436 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
437 connection_cache: Arc<ConnectionCache>,
438 exit: Arc<AtomicBool>,
439) -> io::Result<()> {
440 let server = tcp::listen(listen_addr, Bincode::default)
442 .await?
443 .filter_map(|r| future::ready(r.ok()))
445 .map(server::BaseChannel::with_defaults)
446 .max_channels_per_key(1, |t| {
448 t.as_ref()
449 .peer_addr()
450 .map(|x| x.ip())
451 .unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into())
452 })
453 .map(move |chan| {
456 let (sender, receiver) = unbounded();
457
458 let client = ConnectionCacheClient::<NullTpuInfo>::new(
459 connection_cache.clone(),
460 tpu_addr,
461 None,
462 None,
463 0,
464 );
465
466 SendTransactionService::new_with_client(
467 &bank_forks,
468 receiver,
469 client,
470 Config {
471 retry_rate_ms: 5_000,
472 ..Config::default()
473 },
474 exit.clone(),
475 );
476
477 let server = BanksServer::new(
478 bank_forks.clone(),
479 block_commitment_cache.clone(),
480 sender,
481 Duration::from_millis(200),
482 );
483 chan.execute(server.serve())
484 })
485 .buffer_unordered(10)
487 .for_each(|_| async {});
488
489 server.await;
490 Ok(())
491}