1use {
5 async_lock::Mutex,
6 async_trait::async_trait,
7 futures::future::TryFutureExt,
8 log::*,
9 quinn::{
10 crypto::rustls::QuicClientConfig, ClientConfig, ClosedStream, ConnectError, Connection,
11 ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig,
12 WriteError,
13 },
14 solana_connection_cache::{
15 client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
16 nonblocking::client_connection::ClientConnection,
17 },
18 solana_keypair::Keypair,
19 solana_measure::measure::Measure,
20 solana_net_utils::{SocketConfig, VALIDATOR_PORT_RANGE},
21 solana_quic_definitions::{
22 QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, QUIC_SEND_FAIRNESS,
23 },
24 solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
25 solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID,
26 solana_tls_utils::{
27 new_dummy_x509_certificate, tls_client_config_builder, QuicClientCertificate,
28 },
29 solana_transaction_error::TransportResult,
30 std::{
31 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
32 sync::{atomic::Ordering, Arc},
33 thread,
34 },
35 thiserror::Error,
36 tokio::{sync::OnceCell, time::timeout},
37};
38
39pub struct QuicLazyInitializedEndpoint {
41 endpoint: OnceCell<Arc<Endpoint>>,
42 client_certificate: Arc<QuicClientCertificate>,
43 client_endpoint: Option<Endpoint>,
44}
45
46#[derive(Error, Debug)]
47pub enum QuicError {
48 #[error(transparent)]
49 WriteError(#[from] WriteError),
50 #[error(transparent)]
51 ConnectionError(#[from] ConnectionError),
52 #[error(transparent)]
53 ConnectError(#[from] ConnectError),
54 #[error(transparent)]
55 ClosedStream(#[from] ClosedStream),
56}
57
58impl From<QuicError> for ClientErrorKind {
59 fn from(quic_error: QuicError) -> Self {
60 Self::Custom(format!("{quic_error:?}"))
61 }
62}
63
64impl QuicLazyInitializedEndpoint {
65 pub fn new(
66 client_certificate: Arc<QuicClientCertificate>,
67 client_endpoint: Option<Endpoint>,
68 ) -> Self {
69 Self {
70 endpoint: OnceCell::<Arc<Endpoint>>::new(),
71 client_certificate,
72 client_endpoint,
73 }
74 }
75
76 fn create_endpoint(&self) -> Endpoint {
77 let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
78 endpoint.clone()
79 } else {
80 let config = SocketConfig::default();
81 let client_socket = solana_net_utils::bind_in_range_with_config(
82 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
83 VALIDATOR_PORT_RANGE,
84 config,
85 )
86 .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
87 .1;
88 info!("Local endpoint is : {client_socket:?}");
89
90 QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
91 };
92
93 let mut crypto = tls_client_config_builder()
94 .with_client_auth_cert(
95 vec![self.client_certificate.certificate.clone()],
96 self.client_certificate.key.clone_key(),
97 )
98 .expect("Failed to set QUIC client certificates");
99 crypto.enable_early_data = true;
100 crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
101
102 let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap()));
103 let mut transport_config = TransportConfig::default();
104
105 let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
106 transport_config.max_idle_timeout(Some(timeout));
107 transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
108 transport_config.send_fairness(QUIC_SEND_FAIRNESS);
109 config.transport_config(Arc::new(transport_config));
110
111 endpoint.set_default_client_config(config);
112
113 endpoint
114 }
115
116 async fn get_endpoint(&self) -> Arc<Endpoint> {
117 self.endpoint
118 .get_or_init(|| async { Arc::new(self.create_endpoint()) })
119 .await
120 .clone()
121 }
122}
123
124impl Default for QuicLazyInitializedEndpoint {
125 fn default() -> Self {
126 let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
127 Self::new(
128 Arc::new(QuicClientCertificate {
129 certificate: cert,
130 key: priv_key,
131 }),
132 None,
133 )
134 }
135}
136
137#[derive(Clone)]
140struct QuicNewConnection {
141 endpoint: Arc<Endpoint>,
142 connection: Arc<Connection>,
143}
144
145impl QuicNewConnection {
146 async fn make_connection(
148 endpoint: Arc<QuicLazyInitializedEndpoint>,
149 addr: SocketAddr,
150 stats: &ClientStats,
151 ) -> Result<Self, QuicError> {
152 let mut make_connection_measure = Measure::start("make_connection_measure");
153 let endpoint = endpoint.get_endpoint().await;
154
155 let connecting = endpoint.connect(addr, "connect")?;
156 stats.total_connections.fetch_add(1, Ordering::Relaxed);
157 if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
158 {
159 if connecting_result.is_err() {
160 stats.connection_errors.fetch_add(1, Ordering::Relaxed);
161 }
162 make_connection_measure.stop();
163 stats
164 .make_connection_ms
165 .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
166
167 let connection = connecting_result?;
168
169 Ok(Self {
170 endpoint,
171 connection: Arc::new(connection),
172 })
173 } else {
174 Err(ConnectionError::TimedOut.into())
175 }
176 }
177
178 fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
179 quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
180 .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
181 }
182
183 async fn make_connection_0rtt(
186 &mut self,
187 addr: SocketAddr,
188 stats: &ClientStats,
189 ) -> Result<Arc<Connection>, QuicError> {
190 let connecting = self.endpoint.connect(addr, "connect")?;
191 stats.total_connections.fetch_add(1, Ordering::Relaxed);
192 let connection = match connecting.into_0rtt() {
193 Ok((connection, zero_rtt)) => {
194 if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
195 if zero_rtt {
196 stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
197 } else {
198 stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
199 }
200 connection
201 } else {
202 return Err(ConnectionError::TimedOut.into());
203 }
204 }
205 Err(connecting) => {
206 stats.connection_errors.fetch_add(1, Ordering::Relaxed);
207
208 if let Ok(connecting_result) =
209 timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
210 {
211 connecting_result?
212 } else {
213 return Err(ConnectionError::TimedOut.into());
214 }
215 }
216 };
217 self.connection = Arc::new(connection);
218 Ok(self.connection.clone())
219 }
220}
221
222pub struct QuicClient {
223 endpoint: Arc<QuicLazyInitializedEndpoint>,
224 connection: Arc<Mutex<Option<QuicNewConnection>>>,
225 addr: SocketAddr,
226 stats: Arc<ClientStats>,
227}
228
229impl QuicClient {
230 pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
231 Self {
232 endpoint,
233 connection: Arc::new(Mutex::new(None)),
234 addr,
235 stats: Arc::new(ClientStats::default()),
236 }
237 }
238
239 async fn _send_buffer_using_conn(
240 data: &[u8],
241 connection: &Connection,
242 ) -> Result<(), QuicError> {
243 let mut send_stream = connection.open_uni().await?;
244 send_stream.write_all(data).await?;
245 Ok(())
246 }
247
248 async fn _send_buffer(
251 &self,
252 data: &[u8],
253 stats: &ClientStats,
254 connection_stats: Arc<ConnectionCacheStats>,
255 ) -> Result<Arc<Connection>, QuicError> {
256 let mut measure_send_packet = Measure::start("send_packet_us");
257 let mut measure_prepare_connection = Measure::start("prepare_connection");
258 let mut connection_try_count = 0;
259 let mut last_connection_id = 0;
260 let mut last_error = None;
261 while connection_try_count < 2 {
262 let connection = {
263 let mut conn_guard = self.connection.lock().await;
264
265 let maybe_conn = conn_guard.as_mut();
266 match maybe_conn {
267 Some(conn) => {
268 if conn.connection.stable_id() == last_connection_id {
269 let conn = conn.make_connection_0rtt(self.addr, stats).await;
271 match conn {
272 Ok(conn) => {
273 info!(
274 "Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}",
275 self.addr,
276 conn.stable_id(),
277 connection_try_count,
278 last_connection_id,
279 last_error,
280 );
281 connection_try_count += 1;
282 conn
283 }
284 Err(err) => {
285 info!(
286 "Cannot make 0rtt connection to {}, error {:}",
287 self.addr, err
288 );
289 return Err(err);
290 }
291 }
292 } else {
293 stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
294 conn.connection.clone()
295 }
296 }
297 None => {
298 let conn = QuicNewConnection::make_connection(
299 self.endpoint.clone(),
300 self.addr,
301 stats,
302 )
303 .await;
304 match conn {
305 Ok(conn) => {
306 *conn_guard = Some(conn.clone());
307 info!(
308 "Made connection to {} id {} try_count {}, from connection cache warming?: {}",
309 self.addr,
310 conn.connection.stable_id(),
311 connection_try_count,
312 data.is_empty(),
313 );
314 connection_try_count += 1;
315 conn.connection.clone()
316 }
317 Err(err) => {
318 info!("Cannot make connection to {}, error {:}, from connection cache warming?: {}",
319 self.addr, err, data.is_empty());
320 return Err(err);
321 }
322 }
323 }
324 }
325 };
326
327 let new_stats = connection.stats();
328
329 connection_stats
330 .total_client_stats
331 .congestion_events
332 .update_stat(
333 &self.stats.congestion_events,
334 new_stats.path.congestion_events,
335 );
336
337 connection_stats
338 .total_client_stats
339 .streams_blocked_uni
340 .update_stat(
341 &self.stats.streams_blocked_uni,
342 new_stats.frame_tx.streams_blocked_uni,
343 );
344
345 connection_stats
346 .total_client_stats
347 .data_blocked
348 .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked);
349
350 connection_stats
351 .total_client_stats
352 .acks
353 .update_stat(&self.stats.acks, new_stats.frame_tx.acks);
354
355 if data.is_empty() {
356 return Ok(connection);
358 }
359
360 last_connection_id = connection.stable_id();
361 measure_prepare_connection.stop();
362
363 match Self::_send_buffer_using_conn(data, &connection).await {
364 Ok(()) => {
365 measure_send_packet.stop();
366 stats.successful_packets.fetch_add(1, Ordering::Relaxed);
367 stats
368 .send_packets_us
369 .fetch_add(measure_send_packet.as_us(), Ordering::Relaxed);
370 stats
371 .prepare_connection_us
372 .fetch_add(measure_prepare_connection.as_us(), Ordering::Relaxed);
373 trace!(
374 "Succcessfully sent to {} with id {}, thread: {:?}, data len: {}, send_packet_us: {} prepare_connection_us: {}",
375 self.addr,
376 connection.stable_id(),
377 thread::current().id(),
378 data.len(),
379 measure_send_packet.as_us(),
380 measure_prepare_connection.as_us(),
381 );
382
383 return Ok(connection);
384 }
385 Err(err) => match err {
386 QuicError::ConnectionError(_) => {
387 last_error = Some(err);
388 }
389 _ => {
390 info!(
391 "Error sending to {} with id {}, error {:?} thread: {:?}",
392 self.addr,
393 connection.stable_id(),
394 err,
395 thread::current().id(),
396 );
397 return Err(err);
398 }
399 },
400 }
401 }
402
403 info!(
405 "Ran into an error sending data {:?}, exhausted retries to {}",
406 last_error, self.addr
407 );
408 Err(last_error.expect("QuicClient::_send_buffer last_error.expect"))
411 }
412
413 pub async fn send_buffer<T>(
414 &self,
415 data: T,
416 stats: &ClientStats,
417 connection_stats: Arc<ConnectionCacheStats>,
418 ) -> Result<(), ClientErrorKind>
419 where
420 T: AsRef<[u8]>,
421 {
422 self._send_buffer(data.as_ref(), stats, connection_stats)
423 .await
424 .map_err(Into::<ClientErrorKind>::into)?;
425 Ok(())
426 }
427
428 pub async fn send_batch<T>(
429 &self,
430 buffers: &[T],
431 stats: &ClientStats,
432 connection_stats: Arc<ConnectionCacheStats>,
433 ) -> Result<(), ClientErrorKind>
434 where
435 T: AsRef<[u8]>,
436 {
437 if buffers.is_empty() {
449 return Ok(());
450 }
451 let connection = self
452 ._send_buffer(buffers[0].as_ref(), stats, connection_stats)
453 .await
454 .map_err(Into::<ClientErrorKind>::into)?;
455
456 for data in buffers[1..buffers.len()].iter() {
457 Self::_send_buffer_using_conn(data.as_ref(), &connection).await?;
458 }
459 Ok(())
460 }
461
462 pub fn server_addr(&self) -> &SocketAddr {
463 &self.addr
464 }
465
466 pub fn stats(&self) -> Arc<ClientStats> {
467 self.stats.clone()
468 }
469}
470
471pub struct QuicClientConnection {
472 pub client: Arc<QuicClient>,
473 pub connection_stats: Arc<ConnectionCacheStats>,
474}
475
476impl QuicClientConnection {
477 pub fn base_stats(&self) -> Arc<ClientStats> {
478 self.client.stats()
479 }
480
481 pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
482 self.connection_stats.clone()
483 }
484
485 pub fn new(
486 endpoint: Arc<QuicLazyInitializedEndpoint>,
487 addr: SocketAddr,
488 connection_stats: Arc<ConnectionCacheStats>,
489 ) -> Self {
490 let client = Arc::new(QuicClient::new(endpoint, addr));
491 Self::new_with_client(client, connection_stats)
492 }
493
494 pub fn new_with_client(
495 client: Arc<QuicClient>,
496 connection_stats: Arc<ConnectionCacheStats>,
497 ) -> Self {
498 Self {
499 client,
500 connection_stats,
501 }
502 }
503}
504
505#[async_trait]
506impl ClientConnection for QuicClientConnection {
507 fn server_addr(&self) -> &SocketAddr {
508 self.client.server_addr()
509 }
510
511 async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
512 let stats = ClientStats::default();
513 let len = buffers.len();
514 let res = self
515 .client
516 .send_batch(buffers, &stats, self.connection_stats.clone())
517 .await;
518 self.connection_stats
519 .add_client_stats(&stats, len, res.is_ok());
520 res?;
521 Ok(())
522 }
523
524 async fn send_data(&self, data: &[u8]) -> TransportResult<()> {
525 let stats = Arc::new(ClientStats::default());
526 let num_packets = if data.is_empty() { 0 } else { 1 };
528 self.client
529 .send_buffer(data, &stats, self.connection_stats.clone())
530 .map_ok(|v| {
531 self.connection_stats
532 .add_client_stats(&stats, num_packets, true);
533 v
534 })
535 .map_err(|e| {
536 warn!(
537 "Failed to send data async to {}, error: {:?} ",
538 self.server_addr(),
539 e
540 );
541 datapoint_warn!("send-wire-async", ("failure", 1, i64),);
542 self.connection_stats
543 .add_client_stats(&stats, num_packets, false);
544 e.into()
545 })
546 .await
547 }
548}