solana_client/
connection_cache.rs

1pub use solana_connection_cache::connection_cache::Protocol;
2use {
3    quinn::Endpoint,
4    solana_connection_cache::{
5        client_connection::ClientConnection,
6        connection_cache::{
7            BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
8            NewConnectionConfig,
9        },
10    },
11    solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
12    solana_sdk::{
13        pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair,
14        transport::Result as TransportResult,
15    },
16    solana_streamer::streamer::StakedNodes,
17    solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
18    std::{
19        net::{IpAddr, Ipv4Addr, SocketAddr},
20        sync::{Arc, RwLock},
21    },
22};
23
24const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
25const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
26
27/// A thin wrapper over connection-cache/ConnectionCache to ease
28/// construction of the ConnectionCache for code dealing both with udp and quic.
29/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly.
30pub enum ConnectionCache {
31    Quic(Arc<BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>>),
32    Udp(Arc<BackendConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>>),
33}
34
35type QuicBaseClientConnection = <QuicPool as ConnectionPool>::BaseClientConnection;
36type UdpBaseClientConnection = <UdpPool as ConnectionPool>::BaseClientConnection;
37
38pub enum BlockingClientConnection {
39    Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
40    Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
41}
42
43pub enum NonblockingClientConnection {
44    Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
45    Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
46}
47
48impl NotifyKeyUpdate for ConnectionCache {
49    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
50        match self {
51            Self::Udp(_) => Ok(()),
52            Self::Quic(backend) => backend.update_key(key),
53        }
54    }
55}
56
57impl ConnectionCache {
58    pub fn new(name: &'static str) -> Self {
59        if DEFAULT_CONNECTION_CACHE_USE_QUIC {
60            let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
61            ConnectionCache::new_with_client_options(
62                name,
63                DEFAULT_CONNECTION_POOL_SIZE,
64                None, // client_endpoint
65                Some(cert_info),
66                None, // stake_info
67            )
68        } else {
69            ConnectionCache::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE)
70        }
71    }
72
73    /// Create a quic connection_cache
74    pub fn new_quic(name: &'static str, connection_pool_size: usize) -> Self {
75        Self::new_with_client_options(name, connection_pool_size, None, None, None)
76    }
77
78    /// Create a quic connection_cache with more client options
79    pub fn new_with_client_options(
80        name: &'static str,
81        connection_pool_size: usize,
82        client_endpoint: Option<Endpoint>,
83        cert_info: Option<(&Keypair, IpAddr)>,
84        stake_info: Option<(&Arc<RwLock<StakedNodes>>, &Pubkey)>,
85    ) -> Self {
86        // The minimum pool size is 1.
87        let connection_pool_size = 1.max(connection_pool_size);
88        let mut config = QuicConfig::new().unwrap();
89        if let Some(client_endpoint) = client_endpoint {
90            config.update_client_endpoint(client_endpoint);
91        }
92        if let Some(cert_info) = cert_info {
93            config.update_client_certificate(cert_info.0, cert_info.1);
94        }
95        if let Some(stake_info) = stake_info {
96            config.set_staked_nodes(stake_info.0, stake_info.1);
97        }
98        let connection_manager = QuicConnectionManager::new_with_connection_config(config);
99        let cache =
100            BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
101        Self::Quic(Arc::new(cache))
102    }
103
104    #[inline]
105    pub fn protocol(&self) -> Protocol {
106        match self {
107            Self::Quic(_) => Protocol::QUIC,
108            Self::Udp(_) => Protocol::UDP,
109        }
110    }
111
112    pub fn with_udp(name: &'static str, connection_pool_size: usize) -> Self {
113        // The minimum pool size is 1.
114        let connection_pool_size = 1.max(connection_pool_size);
115        let connection_manager = UdpConnectionManager::default();
116        let cache =
117            BackendConnectionCache::new(name, connection_manager, connection_pool_size).unwrap();
118        Self::Udp(Arc::new(cache))
119    }
120
121    pub fn use_quic(&self) -> bool {
122        matches!(self, Self::Quic(_))
123    }
124
125    pub fn get_connection(&self, addr: &SocketAddr) -> BlockingClientConnection {
126        match self {
127            Self::Quic(cache) => BlockingClientConnection::Quic(cache.get_connection(addr)),
128            Self::Udp(cache) => BlockingClientConnection::Udp(cache.get_connection(addr)),
129        }
130    }
131
132    pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingClientConnection {
133        match self {
134            Self::Quic(cache) => {
135                NonblockingClientConnection::Quic(cache.get_nonblocking_connection(addr))
136            }
137            Self::Udp(cache) => {
138                NonblockingClientConnection::Udp(cache.get_nonblocking_connection(addr))
139            }
140        }
141    }
142}
143
144macro_rules! dispatch {
145    ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
146        #[inline]
147        $(#[$meta])*
148        $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&self $(, $arg:$ty)*) $(-> $out)? {
149            match self {
150                Self::Quic(this) => this.$name($($arg, )*),
151                Self::Udp(this) => this.$name($($arg, )*),
152            }
153        }
154    };
155    ($(#[$meta:meta])* $vis:vis fn $name:ident$(<$($t:ident: $cons:ident + ?Sized),*>)?(&mut self $(, $arg:ident: $ty:ty)*) $(-> $out:ty)?) => {
156        #[inline]
157        $(#[$meta])*
158        $vis fn $name$(<$($t: $cons + ?Sized),*>)?(&mut self $(, $arg:$ty)*) $(-> $out)? {
159            match self {
160                Self::Quic(this) => this.$name($($arg, )*),
161                Self::Udp(this) => this.$name($($arg, )*),
162            }
163        }
164    };
165}
166
167pub(crate) use dispatch;
168
169impl ClientConnection for BlockingClientConnection {
170    dispatch!(fn server_addr(&self) -> &SocketAddr);
171    dispatch!(fn send_data(&self, buffer: &[u8]) -> TransportResult<()>);
172    dispatch!(fn send_data_async(&self, buffer: Vec<u8>) -> TransportResult<()>);
173    dispatch!(fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>);
174    dispatch!(fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>);
175}
176
177#[async_trait::async_trait]
178impl solana_connection_cache::nonblocking::client_connection::ClientConnection
179    for NonblockingClientConnection
180{
181    dispatch!(fn server_addr(&self) -> &SocketAddr);
182
183    async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
184        match self {
185            Self::Quic(cache) => Ok(cache.send_data(buffer).await?),
186            Self::Udp(cache) => Ok(cache.send_data(buffer).await?),
187        }
188    }
189
190    async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
191        match self {
192            Self::Quic(cache) => Ok(cache.send_data_batch(buffers).await?),
193            Self::Udp(cache) => Ok(cache.send_data_batch(buffers).await?),
194        }
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use {
201        super::*,
202        crate::connection_cache::ConnectionCache,
203        crossbeam_channel::unbounded,
204        solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
205        solana_streamer::{
206            nonblocking::quic::{
207                DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
208                DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
209            },
210            quic::SpawnServerResult,
211            streamer::StakedNodes,
212        },
213        std::{
214            net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
215            sync::{
216                atomic::{AtomicBool, Ordering},
217                Arc, RwLock,
218            },
219        },
220    };
221
222    fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair) {
223        (
224            UdpSocket::bind("127.0.0.1:0").unwrap(),
225            Arc::new(AtomicBool::new(false)),
226            Keypair::new(),
227        )
228    }
229
230    #[test]
231    fn test_connection_with_specified_client_endpoint() {
232        // Start a response receiver:
233        let (response_recv_socket, response_recv_exit, keypair2) = server_args();
234        let (sender2, _receiver2) = unbounded();
235
236        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
237
238        let SpawnServerResult {
239            endpoints: mut response_recv_endpoints,
240            thread: response_recv_thread,
241            key_updater: _,
242        } = solana_streamer::quic::spawn_server(
243            "solQuicTest",
244            "quic_streamer_test",
245            response_recv_socket,
246            &keypair2,
247            sender2,
248            response_recv_exit.clone(),
249            1,
250            staked_nodes,
251            10,
252            10,
253            DEFAULT_MAX_STREAMS_PER_MS,
254            DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
255            DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
256            DEFAULT_TPU_COALESCE,
257        )
258        .unwrap();
259
260        let response_recv_endpoint = response_recv_endpoints
261            .pop()
262            .expect("at least one endpoint");
263        let connection_cache = ConnectionCache::new_with_client_options(
264            "connection_cache_test",
265            1,                            // connection_pool_size
266            Some(response_recv_endpoint), // client_endpoint
267            None,                         // cert_info
268            None,                         // stake_info
269        );
270
271        // server port 1:
272        let port1 = 9001;
273        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1);
274        let conn = connection_cache.get_connection(&addr);
275        assert_eq!(conn.server_addr().port(), port1);
276
277        // server port 2:
278        let port2 = 9002;
279        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2);
280        let conn = connection_cache.get_connection(&addr);
281        assert_eq!(conn.server_addr().port(), port2);
282
283        response_recv_exit.store(true, Ordering::Relaxed);
284        response_recv_thread.join().unwrap();
285    }
286}