solana_quic_client/
lib.rs

1#![allow(clippy::arithmetic_side_effects)]
2
3pub mod nonblocking;
4pub mod quic_client;
5
6#[macro_use]
7extern crate solana_metrics;
8
9use {
10    crate::{
11        nonblocking::quic_client::{
12            QuicClient, QuicClientConnection as NonblockingQuicClientConnection,
13            QuicLazyInitializedEndpoint,
14        },
15        quic_client::QuicClientConnection as BlockingQuicClientConnection,
16    },
17    quinn::Endpoint,
18    solana_connection_cache::{
19        connection_cache::{
20            BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
21            ConnectionPoolError, NewConnectionConfig, Protocol,
22        },
23        connection_cache_stats::ConnectionCacheStats,
24    },
25    solana_keypair::Keypair,
26    solana_pubkey::Pubkey,
27    solana_signer::Signer,
28    solana_streamer::streamer::StakedNodes,
29    solana_tls_utils::{new_dummy_x509_certificate, QuicClientCertificate},
30    std::{
31        net::{IpAddr, SocketAddr},
32        sync::{Arc, RwLock},
33    },
34};
35
36pub struct QuicPool {
37    connections: Vec<Arc<Quic>>,
38    endpoint: Arc<QuicLazyInitializedEndpoint>,
39}
40impl ConnectionPool for QuicPool {
41    type BaseClientConnection = Quic;
42    type NewConnectionConfig = QuicConfig;
43
44    fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
45        let connection = self.create_pool_entry(config, addr);
46        let idx = self.connections.len();
47        self.connections.push(connection);
48        idx
49    }
50
51    fn num_connections(&self) -> usize {
52        self.connections.len()
53    }
54
55    fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
56        self.connections
57            .get(index)
58            .cloned()
59            .ok_or(ConnectionPoolError::IndexOutOfRange)
60    }
61
62    fn create_pool_entry(
63        &self,
64        _config: &Self::NewConnectionConfig,
65        addr: &SocketAddr,
66    ) -> Arc<Self::BaseClientConnection> {
67        Arc::new(Quic(Arc::new(QuicClient::new(
68            self.endpoint.clone(),
69            *addr,
70        ))))
71    }
72}
73
74pub struct QuicConfig {
75    // Arc to prevent having to copy the struct
76    client_certificate: RwLock<Arc<QuicClientCertificate>>,
77    maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
78    maybe_client_pubkey: Option<Pubkey>,
79
80    // The optional specified endpoint for the quic based client connections
81    // If not specified, the connection cache will create as needed.
82    client_endpoint: Option<Endpoint>,
83}
84
85impl Clone for QuicConfig {
86    fn clone(&self) -> Self {
87        let cert_guard = self.client_certificate.read().unwrap();
88        QuicConfig {
89            client_certificate: RwLock::new(cert_guard.clone()),
90            maybe_staked_nodes: self.maybe_staked_nodes.clone(),
91            maybe_client_pubkey: self.maybe_client_pubkey,
92            client_endpoint: self.client_endpoint.clone(),
93        }
94    }
95}
96
97impl NewConnectionConfig for QuicConfig {
98    fn new() -> Result<Self, ClientError> {
99        let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
100        Ok(Self {
101            client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
102                certificate: cert,
103                key: priv_key,
104            })),
105            maybe_staked_nodes: None,
106            maybe_client_pubkey: None,
107            client_endpoint: None,
108        })
109    }
110}
111
112impl QuicConfig {
113    fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
114        let cert_guard = self.client_certificate.read().unwrap();
115        QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
116    }
117
118    pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
119        let (cert, priv_key) = new_dummy_x509_certificate(keypair);
120
121        let mut cert_guard = self.client_certificate.write().unwrap();
122
123        *cert_guard = Arc::new(QuicClientCertificate {
124            certificate: cert,
125            key: priv_key,
126        });
127    }
128
129    pub fn update_keypair(&self, keypair: &Keypair) {
130        let (cert, priv_key) = new_dummy_x509_certificate(keypair);
131
132        let mut cert_guard = self.client_certificate.write().unwrap();
133
134        *cert_guard = Arc::new(QuicClientCertificate {
135            certificate: cert,
136            key: priv_key,
137        });
138    }
139
140    pub fn set_staked_nodes(
141        &mut self,
142        staked_nodes: &Arc<RwLock<StakedNodes>>,
143        client_pubkey: &Pubkey,
144    ) {
145        self.maybe_staked_nodes = Some(staked_nodes.clone());
146        self.maybe_client_pubkey = Some(*client_pubkey);
147    }
148
149    pub fn update_client_endpoint(&mut self, client_endpoint: Endpoint) {
150        self.client_endpoint = Some(client_endpoint);
151    }
152}
153
154pub struct Quic(Arc<QuicClient>);
155impl BaseClientConnection for Quic {
156    type BlockingClientConnection = BlockingQuicClientConnection;
157    type NonblockingClientConnection = NonblockingQuicClientConnection;
158
159    fn new_blocking_connection(
160        &self,
161        _addr: SocketAddr,
162        stats: Arc<ConnectionCacheStats>,
163    ) -> Arc<Self::BlockingClientConnection> {
164        Arc::new(BlockingQuicClientConnection::new_with_client(
165            self.0.clone(),
166            stats,
167        ))
168    }
169
170    fn new_nonblocking_connection(
171        &self,
172        _addr: SocketAddr,
173        stats: Arc<ConnectionCacheStats>,
174    ) -> Arc<Self::NonblockingClientConnection> {
175        Arc::new(NonblockingQuicClientConnection::new_with_client(
176            self.0.clone(),
177            stats,
178        ))
179    }
180}
181
182pub struct QuicConnectionManager {
183    connection_config: QuicConfig,
184}
185
186impl ConnectionManager for QuicConnectionManager {
187    type ConnectionPool = QuicPool;
188    type NewConnectionConfig = QuicConfig;
189
190    const PROTOCOL: Protocol = Protocol::QUIC;
191
192    fn new_connection_pool(&self) -> Self::ConnectionPool {
193        QuicPool {
194            connections: Vec::default(),
195            endpoint: Arc::new(self.connection_config.create_endpoint()),
196        }
197    }
198
199    fn new_connection_config(&self) -> QuicConfig {
200        self.connection_config.clone()
201    }
202
203    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
204        self.connection_config.update_keypair(key);
205        Ok(())
206    }
207}
208
209impl QuicConnectionManager {
210    pub fn new_with_connection_config(connection_config: QuicConfig) -> Self {
211        Self { connection_config }
212    }
213}
214
215pub type QuicConnectionCache = ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
216
217pub fn new_quic_connection_cache(
218    name: &'static str,
219    keypair: &Keypair,
220    ipaddr: IpAddr,
221    staked_nodes: &Arc<RwLock<StakedNodes>>,
222    connection_pool_size: usize,
223) -> Result<QuicConnectionCache, ClientError> {
224    let mut config = QuicConfig::new()?;
225    config.update_client_certificate(keypair, ipaddr);
226    config.set_staked_nodes(staked_nodes, &keypair.pubkey());
227    let connection_manager = QuicConnectionManager::new_with_connection_config(config);
228    ConnectionCache::new(name, connection_manager, connection_pool_size)
229}