safecoin_client/
quic_client.rs1use {
5 crate::{
6 connection_cache::ConnectionCacheStats,
7 nonblocking::{
8 quic_client::{
9 QuicClient, QuicLazyInitializedEndpoint,
10 QuicTpuConnection as NonblockingQuicTpuConnection,
11 },
12 tpu_connection::TpuConnection as NonblockingTpuConnection,
13 },
14 tpu_connection::{ClientStats, TpuConnection},
15 },
16 lazy_static::lazy_static,
17 log::*,
18 solana_sdk::transport::{Result as TransportResult, TransportError},
19 std::{
20 net::SocketAddr,
21 sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
22 time::Duration,
23 },
24 tokio::{runtime::Runtime, time::timeout},
25};
26
27const MAX_OUTSTANDING_TASK: u64 = 2000;
28const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;
29
30struct AsyncTaskSemaphore {
34 counter: Mutex<u64>,
36 cond_var: Condvar,
38 permits: u64,
40}
41
42impl AsyncTaskSemaphore {
43 fn new(permits: u64) -> Self {
44 Self {
45 counter: Mutex::new(0),
46 cond_var: Condvar::new(),
47 permits,
48 }
49 }
50
51 fn acquire(&self) -> MutexGuard<u64> {
55 let mut count = self.counter.lock().unwrap();
56 *count += 1;
57 while *count > self.permits {
58 count = self.cond_var.wait(count).unwrap();
59 }
60 count
61 }
62
63 fn release(&self) {
65 let mut count = self.counter.lock().unwrap();
66 *count -= 1;
67 self.cond_var.notify_one();
68 }
69}
70
71lazy_static! {
72 static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
73 AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
74 static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
75 .enable_all()
76 .build()
77 .unwrap();
78}
79
80pub struct QuicTpuConnection {
81 inner: Arc<NonblockingQuicTpuConnection>,
82}
83impl QuicTpuConnection {
84 pub fn new(
85 endpoint: Arc<QuicLazyInitializedEndpoint>,
86 tpu_addr: SocketAddr,
87 connection_stats: Arc<ConnectionCacheStats>,
88 ) -> Self {
89 let inner = Arc::new(NonblockingQuicTpuConnection::new(
90 endpoint,
91 tpu_addr,
92 connection_stats,
93 ));
94 Self { inner }
95 }
96
97 pub fn new_with_client(
98 client: Arc<QuicClient>,
99 connection_stats: Arc<ConnectionCacheStats>,
100 ) -> Self {
101 let inner = Arc::new(NonblockingQuicTpuConnection::new_with_client(
102 client,
103 connection_stats,
104 ));
105 Self { inner }
106 }
107}
108
109async fn send_wire_transaction_async(
110 connection: Arc<NonblockingQuicTpuConnection>,
111 wire_transaction: Vec<u8>,
112) -> TransportResult<()> {
113 let result = timeout(
114 Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
115 connection.send_wire_transaction(wire_transaction),
116 )
117 .await;
118 ASYNC_TASK_SEMAPHORE.release();
119 handle_send_result(result, connection)
120}
121
122async fn send_wire_transaction_batch_async(
123 connection: Arc<NonblockingQuicTpuConnection>,
124 buffers: Vec<Vec<u8>>,
125) -> TransportResult<()> {
126 let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64;
127
128 let result = timeout(
129 Duration::from_millis(time_out),
130 connection.send_wire_transaction_batch(&buffers),
131 )
132 .await;
133 ASYNC_TASK_SEMAPHORE.release();
134 handle_send_result(result, connection)
135}
136
137fn handle_send_result(
139 result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
140 connection: Arc<NonblockingQuicTpuConnection>,
141) -> Result<(), TransportError> {
142 match result {
143 Ok(result) => result,
144 Err(_err) => {
145 let client_stats = ClientStats::default();
146 client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
147 let stats = connection.connection_stats();
148 stats.add_client_stats(&client_stats, 0, false);
149 info!("Timedout sending transaction {:?}", connection.tpu_addr());
150 Err(TransportError::Custom(
151 "Timedout sending transaction".to_string(),
152 ))
153 }
154 }
155}
156
157impl TpuConnection for QuicTpuConnection {
158 fn tpu_addr(&self) -> &SocketAddr {
159 self.inner.tpu_addr()
160 }
161
162 fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
163 where
164 T: AsRef<[u8]> + Send + Sync,
165 {
166 RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?;
167 Ok(())
168 }
169
170 fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
171 let _lock = ASYNC_TASK_SEMAPHORE.acquire();
172 let inner = self.inner.clone();
173
174 let _ = RUNTIME
175 .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await });
176 Ok(())
177 }
178
179 fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
180 let _lock = ASYNC_TASK_SEMAPHORE.acquire();
181 let inner = self.inner.clone();
182 let _ =
183 RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await });
184 Ok(())
185 }
186}