solana_quic_client/
quic_client.rs1use {
5 crate::nonblocking::quic_client::{
6 QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint,
7 },
8 lazy_static::lazy_static,
9 log::*,
10 solana_connection_cache::{
11 client_connection::{ClientConnection, ClientStats},
12 connection_cache_stats::ConnectionCacheStats,
13 nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
14 },
15 solana_transaction_error::{TransportError, TransportResult},
16 std::{
17 net::SocketAddr,
18 sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
19 time::Duration,
20 },
21 tokio::{runtime::Runtime, time::timeout},
22};
23
24pub const MAX_OUTSTANDING_TASK: u64 = 2000;
25const SEND_DATA_TIMEOUT: Duration = Duration::from_secs(10);
26
27struct AsyncTaskSemaphore {
31 counter: Mutex<u64>,
33 cond_var: Condvar,
35 permits: u64,
37}
38
39impl AsyncTaskSemaphore {
40 pub fn new(permits: u64) -> Self {
41 Self {
42 counter: Mutex::new(0),
43 cond_var: Condvar::new(),
44 permits,
45 }
46 }
47
48 pub fn acquire(&self) -> MutexGuard<u64> {
52 let mut count = self.counter.lock().unwrap();
53 *count += 1;
54 while *count > self.permits {
55 count = self.cond_var.wait(count).unwrap();
56 }
57 count
58 }
59
60 pub fn release(&self) {
62 let mut count = self.counter.lock().unwrap();
63 *count -= 1;
64 self.cond_var.notify_one();
65 }
66}
67
68lazy_static! {
69 static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
70 AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
71 static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
72 .thread_name("solQuicClientRt")
73 .enable_all()
74 .build()
75 .unwrap();
76}
77
78async fn send_data_async(
79 connection: Arc<NonblockingQuicConnection>,
80 buffer: Vec<u8>,
81) -> TransportResult<()> {
82 let result = timeout(SEND_DATA_TIMEOUT, connection.send_data(&buffer)).await;
83 ASYNC_TASK_SEMAPHORE.release();
84 handle_send_result(result, connection)
85}
86
87async fn send_data_batch_async(
88 connection: Arc<NonblockingQuicConnection>,
89 buffers: Vec<Vec<u8>>,
90) -> TransportResult<()> {
91 let result = timeout(
92 u32::try_from(buffers.len())
93 .map(|size| SEND_DATA_TIMEOUT.saturating_mul(size))
94 .unwrap_or(Duration::MAX),
95 connection.send_data_batch(&buffers),
96 )
97 .await;
98 ASYNC_TASK_SEMAPHORE.release();
99 handle_send_result(result, connection)
100}
101
102fn handle_send_result(
104 result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
105 connection: Arc<NonblockingQuicConnection>,
106) -> Result<(), TransportError> {
107 match result {
108 Ok(result) => result,
109 Err(_err) => {
110 let client_stats = ClientStats::default();
111 client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
112 let stats = connection.connection_stats();
113 stats.add_client_stats(&client_stats, 0, false);
114 info!("Timedout sending data {:?}", connection.server_addr());
115 Err(TransportError::Custom("Timedout sending data".to_string()))
116 }
117 }
118}
119
120pub struct QuicClientConnection {
121 pub inner: Arc<NonblockingQuicConnection>,
122}
123
124impl QuicClientConnection {
125 pub fn new(
126 endpoint: Arc<QuicLazyInitializedEndpoint>,
127 server_addr: SocketAddr,
128 connection_stats: Arc<ConnectionCacheStats>,
129 ) -> Self {
130 let inner = Arc::new(NonblockingQuicConnection::new(
131 endpoint,
132 server_addr,
133 connection_stats,
134 ));
135 Self { inner }
136 }
137
138 pub fn new_with_client(
139 client: Arc<QuicClient>,
140 connection_stats: Arc<ConnectionCacheStats>,
141 ) -> Self {
142 let inner = Arc::new(NonblockingQuicConnection::new_with_client(
143 client,
144 connection_stats,
145 ));
146 Self { inner }
147 }
148}
149
150impl ClientConnection for QuicClientConnection {
151 fn server_addr(&self) -> &SocketAddr {
152 self.inner.server_addr()
153 }
154
155 fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
156 RUNTIME.block_on(self.inner.send_data_batch(buffers))?;
157 Ok(())
158 }
159
160 fn send_data_async(&self, data: Vec<u8>) -> TransportResult<()> {
161 let _lock = ASYNC_TASK_SEMAPHORE.acquire();
162 let inner = self.inner.clone();
163
164 let _handle = RUNTIME.spawn(send_data_async(inner, data));
165 Ok(())
166 }
167
168 fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
169 let _lock = ASYNC_TASK_SEMAPHORE.acquire();
170 let inner = self.inner.clone();
171 let _handle = RUNTIME.spawn(send_data_batch_async(inner, buffers));
172 Ok(())
173 }
174
175 fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
176 RUNTIME.block_on(self.inner.send_data(buffer))?;
177 Ok(())
178 }
179}