1pub use solana_connection_cache::connection_cache::Protocol;
2use {
3 quinn::Endpoint,
4 solana_connection_cache::{
5 client_connection::ClientConnection,
6 connection_cache::{
7 BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
8 NewConnectionConfig,
9 },
10 },
11 solana_keypair::Keypair,
12 solana_pubkey::Pubkey,
13 solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
14 solana_quic_definitions::NotifyKeyUpdate,
15 solana_streamer::streamer::StakedNodes,
16 solana_transaction_error::TransportResult,
17 solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
18 std::{
19 net::{IpAddr, Ipv4Addr, SocketAddr},
20 sync::{Arc, RwLock},
21 },
22};
23
24const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
25const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
26
27pub enum ConnectionCache {
31 Quic(Arc<BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>>),
32 Udp(Arc<BackendConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>>),
33}
34
35type QuicBaseClientConnection = <QuicPool as ConnectionPool>::BaseClientConnection;
36type UdpBaseClientConnection = <UdpPool as ConnectionPool>::BaseClientConnection;
37
38pub enum BlockingClientConnection {
39 Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
40 Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
41}
42
43pub enum NonblockingClientConnection {
44 Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
45 Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
46}
47
48impl NotifyKeyUpdate for ConnectionCache {
49 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
50 match self {
51 Self::Udp(_) => Ok(()),
52 Self::Quic(backend) => backend.update_key(key),
53 }
54 }
55}
56
57impl ConnectionCache {
58 pub fn new(name: &'static str) -> Self {
59 if DEFAULT_CONNECTION_CACHE_USE_QUIC {
60 let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
61 ConnectionCache::new_with_client_options(
62 name,
63 DEFAULT_CONNECTION_POOL_SIZE,
64 None, Some(cert_info),
66 None, )
68 } else {
69 ConnectionCache::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE)
70 }
71 }
72
73 pub fn new_quic(name: &'static str, connection_pool_size: usize) -> Self {
75 Self::new_with_client_options(name, connection_pool_size, None, None, None)
76 }
77
78 pub fn new_with_client_options(
80 name: &'static str,
81 connection_pool_size: usize,
82 client_endpoint: Option<Endpoint>,
83 cert_info: Option<(&Keypair, IpAddr)>,
84 stake_info: Option<(&Arc<RwLock<StakedNodes>>, &Pubkey)>,
85 ) -> Self {
86 let connection_pool_size = 1.max(connection_pool_size);
88 let mut config = QuicConfig::new().unwrap();
89 if let Some(client_endpoint) = client_endpoint {
90 config.update_client_endpoint(client_endpoint);
91 }
92 if let Some(cert_info) = cert_info {
93 config.update_client_certificate(cert_info.0, cert_info.1);
94 }
95 if let Some(stake_info) = stake_info {
96 config.set_staked_nodes(stake_info.0, stake_info.1);
97 }
98 let connection_manager = QuicConnectionManager::new_with_connection_config(config);
99 let cache =
100 BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
101 Self::Quic(Arc::new(cache))
102 }
103
104 #[inline]
105 pub fn protocol(&self) -> Protocol {
106 match self {
107 Self::Quic(_) => Protocol::QUIC,
108 Self::Udp(_) => Protocol::UDP,
109 }
110 }
111
112 pub fn with_udp(name: &'static str, connection_pool_size: usize) -> Self {
113 let connection_pool_size = 1.max(connection_pool_size);
115 let connection_manager = UdpConnectionManager::default();
116 let cache =
117 BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
118 Self::Udp(Arc::new(cache))
119 }
120
121 pub fn use_quic(&self) -> bool {
122 matches!(self, Self::Quic(_))
123 }
124
125 pub fn get_connection(&self, addr: &SocketAddr) -> BlockingClientConnection {
126 match self {
127 Self::Quic(cache) => BlockingClientConnection::Quic(cache.get_connection(addr)),
128 Self::Udp(cache) => BlockingClientConnection::Udp(cache.get_connection(addr)),
129 }
130 }
131
132 pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingClientConnection {
133 match self {
134 Self::Quic(cache) => {
135 NonblockingClientConnection::Quic(cache.get_nonblocking_connection(addr))
136 }
137 Self::Udp(cache) => {
138 NonblockingClientConnection::Udp(cache.get_nonblocking_connection(addr))
139 }
140 }
141 }
142}
143
144macro_rules! dispatch {
145 ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
146 #[inline]
147 $(#[$meta])*
148 $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&self $(, $arg:$ty)*) $(-> $out)? {
149 match self {
150 Self::Quic(this) => this.$name($($arg, )*),
151 Self::Udp(this) => this.$name($($arg, )*),
152 }
153 }
154 };
155 ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&mut self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
156 #[inline]
157 $(#[$meta])*
158 $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&mut self $(, $arg:$ty)*) $(-> $out)? {
159 match self {
160 Self::Quic(this) => this.$name($($arg, )*),
161 Self::Udp(this) => this.$name($($arg, )*),
162 }
163 }
164 };
165}
166
167pub(crate) use dispatch;
168
169impl ClientConnection for BlockingClientConnection {
170 dispatch!(fn server_addr(&self) -> &SocketAddr);
171 dispatch!(fn send_data(&self, buffer: &[u8]) -> TransportResult<()>);
172 dispatch!(fn send_data_async(&self, buffer: Vec<u8>) -> TransportResult<()>);
173 dispatch!(fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>);
174 dispatch!(fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>);
175}
176
177#[async_trait::async_trait]
178impl solana_connection_cache::nonblocking::client_connection::ClientConnection
179 for NonblockingClientConnection
180{
181 dispatch!(fn server_addr(&self) -> &SocketAddr);
182
183 async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
184 match self {
185 Self::Quic(cache) => Ok(cache.send_data(buffer).await?),
186 Self::Udp(cache) => Ok(cache.send_data(buffer).await?),
187 }
188 }
189
190 async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
191 match self {
192 Self::Quic(cache) => Ok(cache.send_data_batch(buffers).await?),
193 Self::Udp(cache) => Ok(cache.send_data_batch(buffers).await?),
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use {
201 super::*,
202 crate::connection_cache::ConnectionCache,
203 crossbeam_channel::unbounded,
204 solana_keypair::Keypair,
205 solana_net_utils::bind_to_localhost,
206 solana_streamer::{
207 quic::{QuicServerParams, SpawnServerResult},
208 streamer::StakedNodes,
209 },
210 std::{
211 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
212 sync::{
213 atomic::{AtomicBool, Ordering},
214 Arc, RwLock,
215 },
216 },
217 };
218
219 fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair) {
220 (
221 bind_to_localhost().unwrap(),
222 Arc::new(AtomicBool::new(false)),
223 Keypair::new(),
224 )
225 }
226
227 #[test]
228 fn test_connection_with_specified_client_endpoint() {
229 let (response_recv_socket, response_recv_exit, keypair2) = server_args();
231 let (sender2, _receiver2) = unbounded();
232
233 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
234
235 let SpawnServerResult {
236 endpoints: mut response_recv_endpoints,
237 thread: response_recv_thread,
238 key_updater: _,
239 } = solana_streamer::quic::spawn_server(
240 "solQuicTest",
241 "quic_streamer_test",
242 response_recv_socket,
243 &keypair2,
244 sender2,
245 response_recv_exit.clone(),
246 staked_nodes,
247 QuicServerParams {
248 max_connections_per_peer: 1,
249 max_staked_connections: 10,
250 max_unstaked_connections: 10,
251 ..QuicServerParams::default()
252 },
253 )
254 .unwrap();
255
256 let response_recv_endpoint = response_recv_endpoints
257 .pop()
258 .expect("at least one endpoint");
259 let connection_cache = ConnectionCache::new_with_client_options(
260 "connection_cache_test",
261 1, Some(response_recv_endpoint), None, None, );
266
267 let port1 = 9001;
269 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1);
270 let conn = connection_cache.get_connection(&addr);
271 assert_eq!(conn.server_addr().port(), port1);
272
273 let port2 = 9002;
275 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2);
276 let conn = connection_cache.get_connection(&addr);
277 assert_eq!(conn.server_addr().port(), port2);
278
279 response_recv_exit.store(true, Ordering::Relaxed);
280 response_recv_thread.join().unwrap();
281 }
282}