solana_quic_client/nonblocking/
quic_client.rs

1//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
2//! and provides an interface for sending data which is restricted by the
3//! server's flow control.
4use {
5    async_lock::Mutex,
6    async_trait::async_trait,
7    futures::future::TryFutureExt,
8    log::*,
9    quinn::{
10        crypto::rustls::QuicClientConfig, ClientConfig, ClosedStream, ConnectError, Connection,
11        ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig,
12        WriteError,
13    },
14    solana_connection_cache::{
15        client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
16        nonblocking::client_connection::ClientConnection,
17    },
18    solana_keypair::Keypair,
19    solana_measure::measure::Measure,
20    solana_net_utils::{SocketConfig, VALIDATOR_PORT_RANGE},
21    solana_quic_definitions::{
22        QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, QUIC_SEND_FAIRNESS,
23    },
24    solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
25    solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID,
26    solana_tls_utils::{
27        new_dummy_x509_certificate, tls_client_config_builder, QuicClientCertificate,
28    },
29    solana_transaction_error::TransportResult,
30    std::{
31        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
32        sync::{atomic::Ordering, Arc},
33        thread,
34    },
35    thiserror::Error,
36    tokio::{sync::OnceCell, time::timeout},
37};
38
39/// A lazy-initialized Quic Endpoint
40pub struct QuicLazyInitializedEndpoint {
41    endpoint: OnceCell<Arc<Endpoint>>,
42    client_certificate: Arc<QuicClientCertificate>,
43    client_endpoint: Option<Endpoint>,
44}
45
46#[derive(Error, Debug)]
47pub enum QuicError {
48    #[error(transparent)]
49    WriteError(#[from] WriteError),
50    #[error(transparent)]
51    ConnectionError(#[from] ConnectionError),
52    #[error(transparent)]
53    ConnectError(#[from] ConnectError),
54    #[error(transparent)]
55    ClosedStream(#[from] ClosedStream),
56}
57
58impl From<QuicError> for ClientErrorKind {
59    fn from(quic_error: QuicError) -> Self {
60        Self::Custom(format!("{quic_error:?}"))
61    }
62}
63
64impl QuicLazyInitializedEndpoint {
65    pub fn new(
66        client_certificate: Arc<QuicClientCertificate>,
67        client_endpoint: Option<Endpoint>,
68    ) -> Self {
69        Self {
70            endpoint: OnceCell::<Arc<Endpoint>>::new(),
71            client_certificate,
72            client_endpoint,
73        }
74    }
75
76    fn create_endpoint(&self) -> Endpoint {
77        let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
78            endpoint.clone()
79        } else {
80            let config = SocketConfig::default();
81            let client_socket = solana_net_utils::bind_in_range_with_config(
82                IpAddr::V4(Ipv4Addr::UNSPECIFIED),
83                VALIDATOR_PORT_RANGE,
84                config,
85            )
86            .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
87            .1;
88            info!("Local endpoint is : {client_socket:?}");
89
90            QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
91        };
92
93        let mut crypto = tls_client_config_builder()
94            .with_client_auth_cert(
95                vec![self.client_certificate.certificate.clone()],
96                self.client_certificate.key.clone_key(),
97            )
98            .expect("Failed to set QUIC client certificates");
99        crypto.enable_early_data = true;
100        crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
101
102        let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap()));
103        let mut transport_config = TransportConfig::default();
104
105        let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
106        transport_config.max_idle_timeout(Some(timeout));
107        transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
108        transport_config.send_fairness(QUIC_SEND_FAIRNESS);
109        config.transport_config(Arc::new(transport_config));
110
111        endpoint.set_default_client_config(config);
112
113        endpoint
114    }
115
116    async fn get_endpoint(&self) -> Arc<Endpoint> {
117        self.endpoint
118            .get_or_init(|| async { Arc::new(self.create_endpoint()) })
119            .await
120            .clone()
121    }
122}
123
124impl Default for QuicLazyInitializedEndpoint {
125    fn default() -> Self {
126        let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
127        Self::new(
128            Arc::new(QuicClientCertificate {
129                certificate: cert,
130                key: priv_key,
131            }),
132            None,
133        )
134    }
135}
136
137/// A wrapper over NewConnection with additional capability to create the endpoint as part
138/// of creating a new connection.
139#[derive(Clone)]
140struct QuicNewConnection {
141    endpoint: Arc<Endpoint>,
142    connection: Arc<Connection>,
143}
144
145impl QuicNewConnection {
146    /// Create a QuicNewConnection given the remote address 'addr'.
147    async fn make_connection(
148        endpoint: Arc<QuicLazyInitializedEndpoint>,
149        addr: SocketAddr,
150        stats: &ClientStats,
151    ) -> Result<Self, QuicError> {
152        let mut make_connection_measure = Measure::start("make_connection_measure");
153        let endpoint = endpoint.get_endpoint().await;
154
155        let connecting = endpoint.connect(addr, "connect")?;
156        stats.total_connections.fetch_add(1, Ordering::Relaxed);
157        if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
158        {
159            if connecting_result.is_err() {
160                stats.connection_errors.fetch_add(1, Ordering::Relaxed);
161            }
162            make_connection_measure.stop();
163            stats
164                .make_connection_ms
165                .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
166
167            let connection = connecting_result?;
168
169            Ok(Self {
170                endpoint,
171                connection: Arc::new(connection),
172            })
173        } else {
174            Err(ConnectionError::TimedOut.into())
175        }
176    }
177
178    fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
179        quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
180            .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
181    }
182
183    // Attempts to make a faster connection by taking advantage of pre-existing key material.
184    // Only works if connection to this endpoint was previously established.
185    async fn make_connection_0rtt(
186        &mut self,
187        addr: SocketAddr,
188        stats: &ClientStats,
189    ) -> Result<Arc<Connection>, QuicError> {
190        let connecting = self.endpoint.connect(addr, "connect")?;
191        stats.total_connections.fetch_add(1, Ordering::Relaxed);
192        let connection = match connecting.into_0rtt() {
193            Ok((connection, zero_rtt)) => {
194                if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
195                    if zero_rtt {
196                        stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
197                    } else {
198                        stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
199                    }
200                    connection
201                } else {
202                    return Err(ConnectionError::TimedOut.into());
203                }
204            }
205            Err(connecting) => {
206                stats.connection_errors.fetch_add(1, Ordering::Relaxed);
207
208                if let Ok(connecting_result) =
209                    timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
210                {
211                    connecting_result?
212                } else {
213                    return Err(ConnectionError::TimedOut.into());
214                }
215            }
216        };
217        self.connection = Arc::new(connection);
218        Ok(self.connection.clone())
219    }
220}
221
222pub struct QuicClient {
223    endpoint: Arc<QuicLazyInitializedEndpoint>,
224    connection: Arc<Mutex<Option<QuicNewConnection>>>,
225    addr: SocketAddr,
226    stats: Arc<ClientStats>,
227}
228
229impl QuicClient {
230    pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
231        Self {
232            endpoint,
233            connection: Arc::new(Mutex::new(None)),
234            addr,
235            stats: Arc::new(ClientStats::default()),
236        }
237    }
238
239    async fn _send_buffer_using_conn(
240        data: &[u8],
241        connection: &Connection,
242    ) -> Result<(), QuicError> {
243        let mut send_stream = connection.open_uni().await?;
244        send_stream.write_all(data).await?;
245        Ok(())
246    }
247
248    // Attempts to send data, connecting/reconnecting as necessary
249    // On success, returns the connection used to successfully send the data
250    async fn _send_buffer(
251        &self,
252        data: &[u8],
253        stats: &ClientStats,
254        connection_stats: Arc<ConnectionCacheStats>,
255    ) -> Result<Arc<Connection>, QuicError> {
256        let mut measure_send_packet = Measure::start("send_packet_us");
257        let mut measure_prepare_connection = Measure::start("prepare_connection");
258        let mut connection_try_count = 0;
259        let mut last_connection_id = 0;
260        let mut last_error = None;
261        while connection_try_count < 2 {
262            let connection = {
263                let mut conn_guard = self.connection.lock().await;
264
265                let maybe_conn = conn_guard.as_mut();
266                match maybe_conn {
267                    Some(conn) => {
268                        if conn.connection.stable_id() == last_connection_id {
269                            // this is the problematic connection we had used before, create a new one
270                            let conn = conn.make_connection_0rtt(self.addr, stats).await;
271                            match conn {
272                                Ok(conn) => {
273                                    info!(
274                                        "Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}",
275                                        self.addr,
276                                        conn.stable_id(),
277                                        connection_try_count,
278                                        last_connection_id,
279                                        last_error,
280                                    );
281                                    connection_try_count += 1;
282                                    conn
283                                }
284                                Err(err) => {
285                                    info!(
286                                        "Cannot make 0rtt connection to {}, error {:}",
287                                        self.addr, err
288                                    );
289                                    return Err(err);
290                                }
291                            }
292                        } else {
293                            stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
294                            conn.connection.clone()
295                        }
296                    }
297                    None => {
298                        let conn = QuicNewConnection::make_connection(
299                            self.endpoint.clone(),
300                            self.addr,
301                            stats,
302                        )
303                        .await;
304                        match conn {
305                            Ok(conn) => {
306                                *conn_guard = Some(conn.clone());
307                                info!(
308                                    "Made connection to {} id {} try_count {}, from connection cache warming?: {}",
309                                    self.addr,
310                                    conn.connection.stable_id(),
311                                    connection_try_count,
312                                    data.is_empty(),
313                                );
314                                connection_try_count += 1;
315                                conn.connection.clone()
316                            }
317                            Err(err) => {
318                                info!("Cannot make connection to {}, error {:}, from connection cache warming?: {}",
319                                    self.addr, err, data.is_empty());
320                                return Err(err);
321                            }
322                        }
323                    }
324                }
325            };
326
327            let new_stats = connection.stats();
328
329            connection_stats
330                .total_client_stats
331                .congestion_events
332                .update_stat(
333                    &self.stats.congestion_events,
334                    new_stats.path.congestion_events,
335                );
336
337            connection_stats
338                .total_client_stats
339                .streams_blocked_uni
340                .update_stat(
341                    &self.stats.streams_blocked_uni,
342                    new_stats.frame_tx.streams_blocked_uni,
343                );
344
345            connection_stats
346                .total_client_stats
347                .data_blocked
348                .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked);
349
350            connection_stats
351                .total_client_stats
352                .acks
353                .update_stat(&self.stats.acks, new_stats.frame_tx.acks);
354
355            if data.is_empty() {
356                // no need to send packet as it is only for warming connections
357                return Ok(connection);
358            }
359
360            last_connection_id = connection.stable_id();
361            measure_prepare_connection.stop();
362
363            match Self::_send_buffer_using_conn(data, &connection).await {
364                Ok(()) => {
365                    measure_send_packet.stop();
366                    stats.successful_packets.fetch_add(1, Ordering::Relaxed);
367                    stats
368                        .send_packets_us
369                        .fetch_add(measure_send_packet.as_us(), Ordering::Relaxed);
370                    stats
371                        .prepare_connection_us
372                        .fetch_add(measure_prepare_connection.as_us(), Ordering::Relaxed);
373                    trace!(
374                        "Succcessfully sent to {} with id {}, thread: {:?}, data len: {}, send_packet_us: {} prepare_connection_us: {}",
375                        self.addr,
376                        connection.stable_id(),
377                        thread::current().id(),
378                        data.len(),
379                        measure_send_packet.as_us(),
380                        measure_prepare_connection.as_us(),
381                    );
382
383                    return Ok(connection);
384                }
385                Err(err) => match err {
386                    QuicError::ConnectionError(_) => {
387                        last_error = Some(err);
388                    }
389                    _ => {
390                        info!(
391                            "Error sending to {} with id {}, error {:?} thread: {:?}",
392                            self.addr,
393                            connection.stable_id(),
394                            err,
395                            thread::current().id(),
396                        );
397                        return Err(err);
398                    }
399                },
400            }
401        }
402
403        // if we come here, that means we have exhausted maximum retries, return the error
404        info!(
405            "Ran into an error sending data {:?}, exhausted retries to {}",
406            last_error, self.addr
407        );
408        // If we get here but last_error is None, then we have a logic error
409        // in this function, so panic here with an expect to help debugging
410        Err(last_error.expect("QuicClient::_send_buffer last_error.expect"))
411    }
412
413    pub async fn send_buffer<T>(
414        &self,
415        data: T,
416        stats: &ClientStats,
417        connection_stats: Arc<ConnectionCacheStats>,
418    ) -> Result<(), ClientErrorKind>
419    where
420        T: AsRef<[u8]>,
421    {
422        self._send_buffer(data.as_ref(), stats, connection_stats)
423            .await
424            .map_err(Into::<ClientErrorKind>::into)?;
425        Ok(())
426    }
427
428    pub async fn send_batch<T>(
429        &self,
430        buffers: &[T],
431        stats: &ClientStats,
432        connection_stats: Arc<ConnectionCacheStats>,
433    ) -> Result<(), ClientErrorKind>
434    where
435        T: AsRef<[u8]>,
436    {
437        // Start off by "testing" the connection by sending the first buffer
438        // This will also connect to the server if not already connected
439        // and reconnect and retry if the first send attempt failed
440        // (for example due to a timed out connection), returning an error
441        // or the connection that was used to successfully send the buffer.
442        // We will use the returned connection to send the rest of the buffers in the batch
443        // to avoid touching the mutex in self, and not bother reconnecting if we fail along the way
444        // since testing even in the ideal GCE environment has found no cases
445        // where reconnecting and retrying in the middle of a batch send
446        // (i.e. we encounter a connection error in the middle of a batch send, which presumably cannot
447        // be due to a timed out connection) has succeeded
448        if buffers.is_empty() {
449            return Ok(());
450        }
451        let connection = self
452            ._send_buffer(buffers[0].as_ref(), stats, connection_stats)
453            .await
454            .map_err(Into::<ClientErrorKind>::into)?;
455
456        for data in buffers[1..buffers.len()].iter() {
457            Self::_send_buffer_using_conn(data.as_ref(), &connection).await?;
458        }
459        Ok(())
460    }
461
462    pub fn server_addr(&self) -> &SocketAddr {
463        &self.addr
464    }
465
466    pub fn stats(&self) -> Arc<ClientStats> {
467        self.stats.clone()
468    }
469}
470
471pub struct QuicClientConnection {
472    pub client: Arc<QuicClient>,
473    pub connection_stats: Arc<ConnectionCacheStats>,
474}
475
476impl QuicClientConnection {
477    pub fn base_stats(&self) -> Arc<ClientStats> {
478        self.client.stats()
479    }
480
481    pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
482        self.connection_stats.clone()
483    }
484
485    pub fn new(
486        endpoint: Arc<QuicLazyInitializedEndpoint>,
487        addr: SocketAddr,
488        connection_stats: Arc<ConnectionCacheStats>,
489    ) -> Self {
490        let client = Arc::new(QuicClient::new(endpoint, addr));
491        Self::new_with_client(client, connection_stats)
492    }
493
494    pub fn new_with_client(
495        client: Arc<QuicClient>,
496        connection_stats: Arc<ConnectionCacheStats>,
497    ) -> Self {
498        Self {
499            client,
500            connection_stats,
501        }
502    }
503}
504
505#[async_trait]
506impl ClientConnection for QuicClientConnection {
507    fn server_addr(&self) -> &SocketAddr {
508        self.client.server_addr()
509    }
510
511    async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
512        let stats = ClientStats::default();
513        let len = buffers.len();
514        let res = self
515            .client
516            .send_batch(buffers, &stats, self.connection_stats.clone())
517            .await;
518        self.connection_stats
519            .add_client_stats(&stats, len, res.is_ok());
520        res?;
521        Ok(())
522    }
523
524    async fn send_data(&self, data: &[u8]) -> TransportResult<()> {
525        let stats = Arc::new(ClientStats::default());
526        // When data is empty which is from cache warmer, we are not sending packets actually, do not count it in
527        let num_packets = if data.is_empty() { 0 } else { 1 };
528        self.client
529            .send_buffer(data, &stats, self.connection_stats.clone())
530            .map_ok(|v| {
531                self.connection_stats
532                    .add_client_stats(&stats, num_packets, true);
533                v
534            })
535            .map_err(|e| {
536                warn!(
537                    "Failed to send data async to {}, error: {:?} ",
538                    self.server_addr(),
539                    e
540                );
541                datapoint_warn!("send-wire-async", ("failure", 1, i64),);
542                self.connection_stats
543                    .add_client_stats(&stats, num_packets, false);
544                e.into()
545            })
546            .await
547    }
548}