1use {
2 bincode::{deserialize, serialize},
3 crossbeam_channel::{unbounded, Receiver, Sender},
4 futures::{future, prelude::stream::StreamExt},
5 solana_banks_interface::{
6 Banks, BanksRequest, BanksResponse, BanksTransactionResultWithMetadata,
7 BanksTransactionResultWithSimulation, TransactionConfirmationStatus, TransactionMetadata,
8 TransactionSimulationDetails, TransactionStatus,
9 },
10 solana_client::connection_cache::ConnectionCache,
11 solana_feature_set::{move_precompile_verification_to_svm, FeatureSet},
12 solana_runtime::{
13 bank::{Bank, TransactionSimulationResult},
14 bank_forks::BankForks,
15 commitment::BlockCommitmentCache,
16 verify_precompiles::verify_precompiles,
17 },
18 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
19 solana_sdk::{
20 account::Account,
21 clock::Slot,
22 commitment_config::CommitmentLevel,
23 hash::Hash,
24 message::{Message, SanitizedMessage},
25 pubkey::Pubkey,
26 signature::Signature,
27 transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction},
28 },
29 solana_send_transaction_service::{
30 send_transaction_service::{SendTransactionService, TransactionInfo},
31 tpu_info::NullTpuInfo,
32 },
33 std::{
34 io,
35 net::{Ipv4Addr, SocketAddr},
36 sync::{atomic::AtomicBool, Arc, RwLock},
37 thread::Builder,
38 time::Duration,
39 },
40 tarpc::{
41 context::Context,
42 serde_transport::tcp,
43 server::{self, incoming::Incoming, Channel},
44 transport::{self, channel::UnboundedChannel},
45 ClientMessage, Response,
46 },
47 tokio::time::sleep,
48 tokio_serde::formats::Bincode,
49};
50
51#[derive(Clone)]
52struct BanksServer {
53 bank_forks: Arc<RwLock<BankForks>>,
54 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
55 transaction_sender: Sender<TransactionInfo>,
56 poll_signature_status_sleep_duration: Duration,
57}
58
59impl BanksServer {
60 fn new(
65 bank_forks: Arc<RwLock<BankForks>>,
66 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
67 transaction_sender: Sender<TransactionInfo>,
68 poll_signature_status_sleep_duration: Duration,
69 ) -> Self {
70 Self {
71 bank_forks,
72 block_commitment_cache,
73 transaction_sender,
74 poll_signature_status_sleep_duration,
75 }
76 }
77
78 fn run(bank_forks: Arc<RwLock<BankForks>>, transaction_receiver: Receiver<TransactionInfo>) {
79 while let Ok(info) = transaction_receiver.recv() {
80 let mut transaction_infos = vec![info];
81 while let Ok(info) = transaction_receiver.try_recv() {
82 transaction_infos.push(info);
83 }
84 let transactions: Vec<_> = transaction_infos
85 .into_iter()
86 .map(|info| deserialize(&info.wire_transaction).unwrap())
87 .collect();
88 loop {
89 let bank = bank_forks.read().unwrap().working_bank();
90 let lock = bank.freeze_lock();
94 if *lock == Hash::default() {
95 let _ = bank.try_process_entry_transactions(transactions);
96 break;
98 }
99 }
100 }
101 }
102
103 fn new_loopback(
105 bank_forks: Arc<RwLock<BankForks>>,
106 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
107 poll_signature_status_sleep_duration: Duration,
108 ) -> Self {
109 let (transaction_sender, transaction_receiver) = unbounded();
110 let bank = bank_forks.read().unwrap().working_bank();
111 let slot = bank.slot();
112 {
113 let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
115 w_block_commitment_cache.set_all_slots(slot, slot);
116 }
117 let server_bank_forks = bank_forks.clone();
118 Builder::new()
119 .name("solBankForksCli".to_string())
120 .spawn(move || Self::run(server_bank_forks, transaction_receiver))
121 .unwrap();
122 Self::new(
123 bank_forks,
124 block_commitment_cache,
125 transaction_sender,
126 poll_signature_status_sleep_duration,
127 )
128 }
129
130 fn slot(&self, commitment: CommitmentLevel) -> Slot {
131 self.block_commitment_cache
132 .read()
133 .unwrap()
134 .slot_with_commitment(commitment)
135 }
136
137 fn bank(&self, commitment: CommitmentLevel) -> Arc<Bank> {
138 self.bank_forks.read().unwrap()[self.slot(commitment)].clone()
139 }
140
141 async fn poll_signature_status(
142 self,
143 signature: &Signature,
144 blockhash: &Hash,
145 last_valid_block_height: u64,
146 commitment: CommitmentLevel,
147 ) -> Option<transaction::Result<()>> {
148 let mut status = self
149 .bank(commitment)
150 .get_signature_status_with_blockhash(signature, blockhash);
151 while status.is_none() {
152 sleep(self.poll_signature_status_sleep_duration).await;
153 let bank = self.bank(commitment);
154 if bank.block_height() > last_valid_block_height {
155 break;
156 }
157 status = bank.get_signature_status_with_blockhash(signature, blockhash);
158 }
159 status
160 }
161}
162
163fn verify_transaction(
164 transaction: &SanitizedTransaction,
165 feature_set: &Arc<FeatureSet>,
166) -> transaction::Result<()> {
167 transaction.verify()?;
168
169 let move_precompile_verification_to_svm =
170 feature_set.is_active(&move_precompile_verification_to_svm::id());
171 if !move_precompile_verification_to_svm {
172 verify_precompiles(transaction, feature_set)?;
173 }
174
175 Ok(())
176}
177
178fn simulate_transaction(
179 bank: &Bank,
180 transaction: VersionedTransaction,
181) -> BanksTransactionResultWithSimulation {
182 let sanitized_transaction = match RuntimeTransaction::try_create(
183 transaction,
184 MessageHash::Compute,
185 Some(false), bank,
187 bank.get_reserved_account_keys(),
188 ) {
189 Err(err) => {
190 return BanksTransactionResultWithSimulation {
191 result: Some(Err(err)),
192 simulation_details: None,
193 };
194 }
195 Ok(tx) => tx,
196 };
197 let TransactionSimulationResult {
198 result,
199 logs,
200 post_simulation_accounts: _,
201 units_consumed,
202 return_data,
203 inner_instructions,
204 } = bank.simulate_transaction_unchecked(&sanitized_transaction, true);
205
206 let simulation_details = TransactionSimulationDetails {
207 logs,
208 units_consumed,
209 return_data,
210 inner_instructions,
211 };
212 BanksTransactionResultWithSimulation {
213 result: Some(result),
214 simulation_details: Some(simulation_details),
215 }
216}
217
218#[tarpc::server]
219impl Banks for BanksServer {
220 async fn send_transaction_with_context(self, _: Context, transaction: VersionedTransaction) {
221 let blockhash = transaction.message.recent_blockhash();
222 let last_valid_block_height = self
223 .bank_forks
224 .read()
225 .unwrap()
226 .root_bank()
227 .get_blockhash_last_valid_block_height(blockhash)
228 .unwrap();
229 let signature = transaction.signatures.first().cloned().unwrap_or_default();
230 let info = TransactionInfo::new(
231 signature,
232 serialize(&transaction).unwrap(),
233 last_valid_block_height,
234 None,
235 None,
236 None,
237 );
238 self.transaction_sender.send(info).unwrap();
239 }
240
241 async fn get_transaction_status_with_context(
242 self,
243 _: Context,
244 signature: Signature,
245 ) -> Option<TransactionStatus> {
246 let bank = self.bank(CommitmentLevel::Processed);
247 let (slot, status) = bank.get_signature_status_slot(&signature)?;
248 let r_block_commitment_cache = self.block_commitment_cache.read().unwrap();
249
250 let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed);
251 let optimistically_confirmed =
252 optimistically_confirmed_bank.get_signature_status_slot(&signature);
253
254 let confirmations = if r_block_commitment_cache.root() >= slot
255 && r_block_commitment_cache.highest_super_majority_root() >= slot
256 {
257 None
258 } else {
259 r_block_commitment_cache
260 .get_confirmation_count(slot)
261 .or(Some(0))
262 };
263 Some(TransactionStatus {
264 slot,
265 confirmations,
266 err: status.err(),
267 confirmation_status: if confirmations.is_none() {
268 Some(TransactionConfirmationStatus::Finalized)
269 } else if optimistically_confirmed.is_some() {
270 Some(TransactionConfirmationStatus::Confirmed)
271 } else {
272 Some(TransactionConfirmationStatus::Processed)
273 },
274 })
275 }
276
277 async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot {
278 self.slot(commitment)
279 }
280
281 async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 {
282 self.bank(commitment).block_height()
283 }
284
285 async fn process_transaction_with_preflight_and_commitment_and_context(
286 self,
287 ctx: Context,
288 transaction: VersionedTransaction,
289 commitment: CommitmentLevel,
290 ) -> BanksTransactionResultWithSimulation {
291 let mut simulation_result =
292 simulate_transaction(&self.bank(commitment), transaction.clone());
293 if let Some(Ok(_)) = simulation_result.result {
296 simulation_result.result = self
297 .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
298 .await;
299 }
300 simulation_result
301 }
302
303 async fn simulate_transaction_with_commitment_and_context(
304 self,
305 _: Context,
306 transaction: VersionedTransaction,
307 commitment: CommitmentLevel,
308 ) -> BanksTransactionResultWithSimulation {
309 simulate_transaction(&self.bank(commitment), transaction)
310 }
311
312 async fn process_transaction_with_commitment_and_context(
313 self,
314 _: Context,
315 transaction: VersionedTransaction,
316 commitment: CommitmentLevel,
317 ) -> Option<transaction::Result<()>> {
318 let bank = self.bank(commitment);
319 let sanitized_transaction = match SanitizedTransaction::try_create(
320 transaction.clone(),
321 MessageHash::Compute,
322 Some(false), bank.as_ref(),
324 bank.get_reserved_account_keys(),
325 ) {
326 Ok(tx) => tx,
327 Err(err) => return Some(Err(err)),
328 };
329
330 if let Err(err) = verify_transaction(&sanitized_transaction, &bank.feature_set) {
331 return Some(Err(err));
332 }
333
334 let blockhash = transaction.message.recent_blockhash();
335 let last_valid_block_height = self
336 .bank(commitment)
337 .get_blockhash_last_valid_block_height(blockhash)
338 .unwrap();
339 let signature = sanitized_transaction.signature();
340 let info = TransactionInfo::new(
341 *signature,
342 serialize(&transaction).unwrap(),
343 last_valid_block_height,
344 None,
345 None,
346 None,
347 );
348 self.transaction_sender.send(info).unwrap();
349 self.poll_signature_status(signature, blockhash, last_valid_block_height, commitment)
350 .await
351 }
352
353 async fn process_transaction_with_metadata_and_context(
354 self,
355 _: Context,
356 transaction: VersionedTransaction,
357 ) -> BanksTransactionResultWithMetadata {
358 let bank = self.bank_forks.read().unwrap().working_bank();
359 match bank.process_transaction_with_metadata(transaction) {
360 Err(error) => BanksTransactionResultWithMetadata {
361 result: Err(error),
362 metadata: None,
363 },
364 Ok(details) => BanksTransactionResultWithMetadata {
365 result: details.status,
366 metadata: Some(TransactionMetadata {
367 compute_units_consumed: details.executed_units,
368 log_messages: details.log_messages.unwrap_or_default(),
369 return_data: details.return_data,
370 }),
371 },
372 }
373 }
374
375 async fn get_account_with_commitment_and_context(
376 self,
377 _: Context,
378 address: Pubkey,
379 commitment: CommitmentLevel,
380 ) -> Option<Account> {
381 let bank = self.bank(commitment);
382 bank.get_account(&address).map(Account::from)
383 }
384
385 async fn get_latest_blockhash_with_context(self, _: Context) -> Hash {
386 let bank = self.bank(CommitmentLevel::default());
387 bank.last_blockhash()
388 }
389
390 async fn get_latest_blockhash_with_commitment_and_context(
391 self,
392 _: Context,
393 commitment: CommitmentLevel,
394 ) -> Option<(Hash, u64)> {
395 let bank = self.bank(commitment);
396 let blockhash = bank.last_blockhash();
397 let last_valid_block_height = bank.get_blockhash_last_valid_block_height(&blockhash)?;
398 Some((blockhash, last_valid_block_height))
399 }
400
401 async fn get_fee_for_message_with_commitment_and_context(
402 self,
403 _: Context,
404 message: Message,
405 commitment: CommitmentLevel,
406 ) -> Option<u64> {
407 let bank = self.bank(commitment);
408 let sanitized_message =
409 SanitizedMessage::try_from_legacy_message(message, bank.get_reserved_account_keys())
410 .ok()?;
411 bank.get_fee_for_message(&sanitized_message)
412 }
413}
414
415pub async fn start_local_server(
416 bank_forks: Arc<RwLock<BankForks>>,
417 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
418 poll_signature_status_sleep_duration: Duration,
419) -> UnboundedChannel<Response<BanksResponse>, ClientMessage<BanksRequest>> {
420 let banks_server = BanksServer::new_loopback(
421 bank_forks,
422 block_commitment_cache,
423 poll_signature_status_sleep_duration,
424 );
425 let (client_transport, server_transport) = transport::channel::unbounded();
426 let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve());
427 tokio::spawn(server);
428 client_transport
429}
430
431pub async fn start_tcp_server(
432 listen_addr: SocketAddr,
433 tpu_addr: SocketAddr,
434 bank_forks: Arc<RwLock<BankForks>>,
435 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
436 connection_cache: Arc<ConnectionCache>,
437 exit: Arc<AtomicBool>,
438) -> io::Result<()> {
439 let server = tcp::listen(listen_addr, Bincode::default)
441 .await?
442 .filter_map(|r| future::ready(r.ok()))
444 .map(server::BaseChannel::with_defaults)
445 .max_channels_per_key(1, |t| {
447 t.as_ref()
448 .peer_addr()
449 .map(|x| x.ip())
450 .unwrap_or_else(|_| Ipv4Addr::UNSPECIFIED.into())
451 })
452 .map(move |chan| {
455 let (sender, receiver) = unbounded();
456
457 SendTransactionService::new::<NullTpuInfo>(
458 tpu_addr,
459 &bank_forks,
460 None,
461 receiver,
462 &connection_cache,
463 5_000,
464 0,
465 exit.clone(),
466 );
467
468 let server = BanksServer::new(
469 bank_forks.clone(),
470 block_commitment_cache.clone(),
471 sender,
472 Duration::from_millis(200),
473 );
474 chan.execute(server.serve())
475 })
476 .buffer_unordered(10)
478 .for_each(|_| async {});
479
480 server.await;
481 Ok(())
482}