safecoin_client/
connection_cache.rs

1use {
2    crate::{
3        nonblocking::{
4            quic_client::{QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint},
5            tpu_connection::NonblockingConnection,
6        },
7        tpu_connection::{BlockingConnection, ClientStats},
8    },
9    indexmap::map::{Entry, IndexMap},
10    rand::{thread_rng, Rng},
11    safecoin_measure::measure::Measure,
12    solana_sdk::{
13        pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval,
14    },
15    solana_streamer::{
16        nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
17        streamer::StakedNodes,
18        tls_certificates::new_self_signed_tls_certificate_chain,
19    },
20    std::{
21        error::Error,
22        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
23        sync::{
24            atomic::{AtomicU64, Ordering},
25            Arc, RwLock,
26        },
27    },
28};
29
30// Should be non-zero
31static MAX_CONNECTIONS: usize = 1024;
32
33/// Used to decide whether the TPU and underlying connection cache should use
34/// QUIC connections.
35pub const DEFAULT_TPU_USE_QUIC: bool = true;
36
37/// Default TPU connection pool size per remote address
38pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
39
40pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
41
42#[derive(Default)]
43pub struct ConnectionCacheStats {
44    cache_hits: AtomicU64,
45    cache_misses: AtomicU64,
46    cache_evictions: AtomicU64,
47    eviction_time_ms: AtomicU64,
48    sent_packets: AtomicU64,
49    total_batches: AtomicU64,
50    batch_success: AtomicU64,
51    batch_failure: AtomicU64,
52    get_connection_ms: AtomicU64,
53    get_connection_lock_ms: AtomicU64,
54    get_connection_hit_ms: AtomicU64,
55    get_connection_miss_ms: AtomicU64,
56
57    // Need to track these separately per-connection
58    // because we need to track the base stat value from quinn
59    pub total_client_stats: ClientStats,
60}
61
62const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
63
64impl ConnectionCacheStats {
65    pub fn add_client_stats(
66        &self,
67        client_stats: &ClientStats,
68        num_packets: usize,
69        is_success: bool,
70    ) {
71        self.total_client_stats.total_connections.fetch_add(
72            client_stats.total_connections.load(Ordering::Relaxed),
73            Ordering::Relaxed,
74        );
75        self.total_client_stats.connection_reuse.fetch_add(
76            client_stats.connection_reuse.load(Ordering::Relaxed),
77            Ordering::Relaxed,
78        );
79        self.total_client_stats.connection_errors.fetch_add(
80            client_stats.connection_errors.load(Ordering::Relaxed),
81            Ordering::Relaxed,
82        );
83        self.total_client_stats.zero_rtt_accepts.fetch_add(
84            client_stats.zero_rtt_accepts.load(Ordering::Relaxed),
85            Ordering::Relaxed,
86        );
87        self.total_client_stats.zero_rtt_rejects.fetch_add(
88            client_stats.zero_rtt_rejects.load(Ordering::Relaxed),
89            Ordering::Relaxed,
90        );
91        self.total_client_stats.make_connection_ms.fetch_add(
92            client_stats.make_connection_ms.load(Ordering::Relaxed),
93            Ordering::Relaxed,
94        );
95        self.total_client_stats.send_timeout.fetch_add(
96            client_stats.send_timeout.load(Ordering::Relaxed),
97            Ordering::Relaxed,
98        );
99        self.sent_packets
100            .fetch_add(num_packets as u64, Ordering::Relaxed);
101        self.total_batches.fetch_add(1, Ordering::Relaxed);
102        if is_success {
103            self.batch_success.fetch_add(1, Ordering::Relaxed);
104        } else {
105            self.batch_failure.fetch_add(1, Ordering::Relaxed);
106        }
107    }
108
109    fn report(&self) {
110        datapoint_info!(
111            "quic-client-connection-stats",
112            (
113                "cache_hits",
114                self.cache_hits.swap(0, Ordering::Relaxed),
115                i64
116            ),
117            (
118                "cache_misses",
119                self.cache_misses.swap(0, Ordering::Relaxed),
120                i64
121            ),
122            (
123                "cache_evictions",
124                self.cache_evictions.swap(0, Ordering::Relaxed),
125                i64
126            ),
127            (
128                "eviction_time_ms",
129                self.eviction_time_ms.swap(0, Ordering::Relaxed),
130                i64
131            ),
132            (
133                "get_connection_ms",
134                self.get_connection_ms.swap(0, Ordering::Relaxed),
135                i64
136            ),
137            (
138                "get_connection_lock_ms",
139                self.get_connection_lock_ms.swap(0, Ordering::Relaxed),
140                i64
141            ),
142            (
143                "get_connection_hit_ms",
144                self.get_connection_hit_ms.swap(0, Ordering::Relaxed),
145                i64
146            ),
147            (
148                "get_connection_miss_ms",
149                self.get_connection_miss_ms.swap(0, Ordering::Relaxed),
150                i64
151            ),
152            (
153                "make_connection_ms",
154                self.total_client_stats
155                    .make_connection_ms
156                    .swap(0, Ordering::Relaxed),
157                i64
158            ),
159            (
160                "total_connections",
161                self.total_client_stats
162                    .total_connections
163                    .swap(0, Ordering::Relaxed),
164                i64
165            ),
166            (
167                "connection_reuse",
168                self.total_client_stats
169                    .connection_reuse
170                    .swap(0, Ordering::Relaxed),
171                i64
172            ),
173            (
174                "connection_errors",
175                self.total_client_stats
176                    .connection_errors
177                    .swap(0, Ordering::Relaxed),
178                i64
179            ),
180            (
181                "zero_rtt_accepts",
182                self.total_client_stats
183                    .zero_rtt_accepts
184                    .swap(0, Ordering::Relaxed),
185                i64
186            ),
187            (
188                "zero_rtt_rejects",
189                self.total_client_stats
190                    .zero_rtt_rejects
191                    .swap(0, Ordering::Relaxed),
192                i64
193            ),
194            (
195                "congestion_events",
196                self.total_client_stats.congestion_events.load_and_reset(),
197                i64
198            ),
199            (
200                "tx_streams_blocked_uni",
201                self.total_client_stats
202                    .tx_streams_blocked_uni
203                    .load_and_reset(),
204                i64
205            ),
206            (
207                "tx_data_blocked",
208                self.total_client_stats.tx_data_blocked.load_and_reset(),
209                i64
210            ),
211            (
212                "tx_acks",
213                self.total_client_stats.tx_acks.load_and_reset(),
214                i64
215            ),
216            (
217                "num_packets",
218                self.sent_packets.swap(0, Ordering::Relaxed),
219                i64
220            ),
221            (
222                "total_batches",
223                self.total_batches.swap(0, Ordering::Relaxed),
224                i64
225            ),
226            (
227                "batch_failure",
228                self.batch_failure.swap(0, Ordering::Relaxed),
229                i64
230            ),
231            (
232                "send_timeout",
233                self.total_client_stats
234                    .send_timeout
235                    .swap(0, Ordering::Relaxed),
236                i64
237            ),
238        );
239    }
240}
241
242pub struct ConnectionCache {
243    map: RwLock<IndexMap<SocketAddr, ConnectionPool>>,
244    stats: Arc<ConnectionCacheStats>,
245    last_stats: AtomicInterval,
246    connection_pool_size: usize,
247    tpu_udp_socket: Arc<UdpSocket>,
248    client_certificate: Arc<QuicClientCertificate>,
249    use_quic: bool,
250    maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
251    maybe_client_pubkey: Option<Pubkey>,
252}
253
254/// Models the pool of connections
255struct ConnectionPool {
256    /// The connections in the pool
257    connections: Vec<Arc<BaseTpuConnection>>,
258
259    /// Connections in this pool share the same endpoint
260    endpoint: Option<Arc<QuicLazyInitializedEndpoint>>,
261}
262
263impl ConnectionPool {
264    /// Get a connection from the pool. It must have at least one connection in the pool.
265    /// This randomly picks a connection in the pool.
266    fn borrow_connection(&self) -> Arc<BaseTpuConnection> {
267        let mut rng = thread_rng();
268        let n = rng.gen_range(0, self.connections.len());
269        self.connections[n].clone()
270    }
271
272    /// Check if we need to create a new connection. If the count of the connections
273    /// is smaller than the pool size.
274    fn need_new_connection(&self, required_pool_size: usize) -> bool {
275        self.connections.len() < required_pool_size
276    }
277}
278
279impl ConnectionCache {
280    pub fn new(connection_pool_size: usize) -> Self {
281        // The minimum pool size is 1.
282        let connection_pool_size = 1.max(connection_pool_size);
283        Self {
284            use_quic: true,
285            connection_pool_size,
286            ..Self::default()
287        }
288    }
289
290    pub fn update_client_certificate(
291        &mut self,
292        keypair: &Keypair,
293        ipaddr: IpAddr,
294    ) -> Result<(), Box<dyn Error>> {
295        let (certs, priv_key) = new_self_signed_tls_certificate_chain(keypair, ipaddr)?;
296        self.client_certificate = Arc::new(QuicClientCertificate {
297            certificates: certs,
298            key: priv_key,
299        });
300        Ok(())
301    }
302
303    pub fn set_staked_nodes(
304        &mut self,
305        staked_nodes: &Arc<RwLock<StakedNodes>>,
306        client_pubkey: &Pubkey,
307    ) {
308        self.maybe_staked_nodes = Some(staked_nodes.clone());
309        self.maybe_client_pubkey = Some(*client_pubkey);
310    }
311
312    pub fn with_udp(connection_pool_size: usize) -> Self {
313        // The minimum pool size is 1.
314        let connection_pool_size = 1.max(connection_pool_size);
315        Self {
316            use_quic: false,
317            connection_pool_size,
318            ..Self::default()
319        }
320    }
321
322    pub fn use_quic(&self) -> bool {
323        self.use_quic
324    }
325
326    fn create_endpoint(&self, force_use_udp: bool) -> Option<Arc<QuicLazyInitializedEndpoint>> {
327        if self.use_quic() && !force_use_udp {
328            Some(Arc::new(QuicLazyInitializedEndpoint::new(
329                self.client_certificate.clone(),
330            )))
331        } else {
332            None
333        }
334    }
335
336    fn compute_max_parallel_streams(&self) -> usize {
337        let (client_type, stake, total_stake) =
338            self.maybe_client_pubkey
339                .map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| {
340                    self.maybe_staked_nodes.as_ref().map_or(
341                        (ConnectionPeerType::Unstaked, 0, 0),
342                        |stakes| {
343                            let rstakes = stakes.read().unwrap();
344                            rstakes.pubkey_stake_map.get(&pubkey).map_or(
345                                (ConnectionPeerType::Unstaked, 0, rstakes.total_stake),
346                                |stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake),
347                            )
348                        },
349                    )
350                });
351        compute_max_allowed_uni_streams(client_type, stake, total_stake)
352    }
353
354    /// Create a lazy connection object under the exclusive lock of the cache map if there is not
355    /// enough used connections in the connection pool for the specified address.
356    /// Returns CreateConnectionResult.
357    fn create_connection(
358        &self,
359        lock_timing_ms: &mut u64,
360        addr: &SocketAddr,
361        force_use_udp: bool,
362    ) -> CreateConnectionResult {
363        let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
364        let mut map = self.map.write().unwrap();
365        get_connection_map_lock_measure.stop();
366        *lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
367        // Read again, as it is possible that between read lock dropped and the write lock acquired
368        // another thread could have setup the connection.
369
370        let (to_create_connection, endpoint) =
371            map.get(addr)
372                .map_or((true, self.create_endpoint(force_use_udp)), |pool| {
373                    (
374                        pool.need_new_connection(self.connection_pool_size),
375                        pool.endpoint.clone(),
376                    )
377                });
378
379        let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection {
380            let connection = if !self.use_quic() || force_use_udp {
381                BaseTpuConnection::Udp(self.tpu_udp_socket.clone())
382            } else {
383                BaseTpuConnection::Quic(Arc::new(QuicClient::new(
384                    endpoint.as_ref().unwrap().clone(),
385                    *addr,
386                    self.compute_max_parallel_streams(),
387                )))
388            };
389
390            let connection = Arc::new(connection);
391
392            // evict a connection if the cache is reaching upper bounds
393            let mut num_evictions = 0;
394            let mut get_connection_cache_eviction_measure =
395                Measure::start("get_connection_cache_eviction_measure");
396            while map.len() >= MAX_CONNECTIONS {
397                let mut rng = thread_rng();
398                let n = rng.gen_range(0, MAX_CONNECTIONS);
399                map.swap_remove_index(n);
400                num_evictions += 1;
401            }
402            get_connection_cache_eviction_measure.stop();
403
404            match map.entry(*addr) {
405                Entry::Occupied(mut entry) => {
406                    let pool = entry.get_mut();
407                    pool.connections.push(connection);
408                }
409                Entry::Vacant(entry) => {
410                    entry.insert(ConnectionPool {
411                        connections: vec![connection],
412                        endpoint,
413                    });
414                }
415            }
416            (
417                false,
418                num_evictions,
419                get_connection_cache_eviction_measure.as_ms(),
420            )
421        } else {
422            (true, 0, 0)
423        };
424
425        let pool = map.get(addr).unwrap();
426        let connection = pool.borrow_connection();
427
428        CreateConnectionResult {
429            connection,
430            cache_hit,
431            connection_cache_stats: self.stats.clone(),
432            num_evictions,
433            eviction_timing_ms,
434        }
435    }
436
437    fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult {
438        let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
439        let map = self.map.read().unwrap();
440        get_connection_map_lock_measure.stop();
441
442        let port_offset = if self.use_quic() { QUIC_PORT_OFFSET } else { 0 };
443
444        let port = addr
445            .port()
446            .checked_add(port_offset)
447            .unwrap_or_else(|| addr.port());
448        let force_use_udp = port == addr.port();
449        let addr = SocketAddr::new(addr.ip(), port);
450
451        let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
452
453        let report_stats = self
454            .last_stats
455            .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
456
457        let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
458        let CreateConnectionResult {
459            connection,
460            cache_hit,
461            connection_cache_stats,
462            num_evictions,
463            eviction_timing_ms,
464        } = match map.get(&addr) {
465            Some(pool) => {
466                if pool.need_new_connection(self.connection_pool_size) {
467                    // create more connection and put it in the pool
468                    drop(map);
469                    self.create_connection(&mut lock_timing_ms, &addr, force_use_udp)
470                } else {
471                    let connection = pool.borrow_connection();
472                    CreateConnectionResult {
473                        connection,
474                        cache_hit: true,
475                        connection_cache_stats: self.stats.clone(),
476                        num_evictions: 0,
477                        eviction_timing_ms: 0,
478                    }
479                }
480            }
481            None => {
482                // Upgrade to write access by dropping read lock and acquire write lock
483                drop(map);
484                self.create_connection(&mut lock_timing_ms, &addr, force_use_udp)
485            }
486        };
487        get_connection_map_measure.stop();
488
489        GetConnectionResult {
490            connection,
491            cache_hit,
492            report_stats,
493            map_timing_ms: get_connection_map_measure.as_ms(),
494            lock_timing_ms,
495            connection_cache_stats,
496            num_evictions,
497            eviction_timing_ms,
498        }
499    }
500
501    fn get_connection_and_log_stats(
502        &self,
503        addr: &SocketAddr,
504    ) -> (Arc<BaseTpuConnection>, Arc<ConnectionCacheStats>) {
505        let mut get_connection_measure = Measure::start("get_connection_measure");
506        let GetConnectionResult {
507            connection,
508            cache_hit,
509            report_stats,
510            map_timing_ms,
511            lock_timing_ms,
512            connection_cache_stats,
513            num_evictions,
514            eviction_timing_ms,
515        } = self.get_or_add_connection(addr);
516
517        if report_stats {
518            connection_cache_stats.report();
519        }
520
521        if cache_hit {
522            connection_cache_stats
523                .cache_hits
524                .fetch_add(1, Ordering::Relaxed);
525            connection_cache_stats
526                .get_connection_hit_ms
527                .fetch_add(map_timing_ms, Ordering::Relaxed);
528        } else {
529            connection_cache_stats
530                .cache_misses
531                .fetch_add(1, Ordering::Relaxed);
532            connection_cache_stats
533                .get_connection_miss_ms
534                .fetch_add(map_timing_ms, Ordering::Relaxed);
535            connection_cache_stats
536                .cache_evictions
537                .fetch_add(num_evictions, Ordering::Relaxed);
538            connection_cache_stats
539                .eviction_time_ms
540                .fetch_add(eviction_timing_ms, Ordering::Relaxed);
541        }
542
543        get_connection_measure.stop();
544        connection_cache_stats
545            .get_connection_lock_ms
546            .fetch_add(lock_timing_ms, Ordering::Relaxed);
547        connection_cache_stats
548            .get_connection_ms
549            .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
550
551        (connection, connection_cache_stats)
552    }
553
554    pub fn get_connection(&self, addr: &SocketAddr) -> BlockingConnection {
555        let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
556        connection.new_blocking_connection(*addr, connection_cache_stats)
557    }
558
559    pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingConnection {
560        let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
561        connection.new_nonblocking_connection(*addr, connection_cache_stats)
562    }
563}
564
565impl Default for ConnectionCache {
566    fn default() -> Self {
567        let (certs, priv_key) = new_self_signed_tls_certificate_chain(
568            &Keypair::new(),
569            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
570        )
571        .expect("Failed to initialize QUIC client certificates");
572        Self {
573            map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
574            stats: Arc::new(ConnectionCacheStats::default()),
575            last_stats: AtomicInterval::default(),
576            connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
577            tpu_udp_socket: Arc::new(
578                safecoin_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
579                    .expect("Unable to bind to UDP socket"),
580            ),
581            client_certificate: Arc::new(QuicClientCertificate {
582                certificates: certs,
583                key: priv_key,
584            }),
585            use_quic: DEFAULT_TPU_USE_QUIC,
586            maybe_staked_nodes: None,
587            maybe_client_pubkey: None,
588        }
589    }
590}
591
592enum BaseTpuConnection {
593    Udp(Arc<UdpSocket>),
594    Quic(Arc<QuicClient>),
595}
596impl BaseTpuConnection {
597    fn new_blocking_connection(
598        &self,
599        addr: SocketAddr,
600        stats: Arc<ConnectionCacheStats>,
601    ) -> BlockingConnection {
602        use crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
603        match self {
604            BaseTpuConnection::Udp(udp_socket) => {
605                UdpTpuConnection::new_from_addr(udp_socket.clone(), addr).into()
606            }
607            BaseTpuConnection::Quic(quic_client) => {
608                QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
609            }
610        }
611    }
612
613    fn new_nonblocking_connection(
614        &self,
615        addr: SocketAddr,
616        stats: Arc<ConnectionCacheStats>,
617    ) -> NonblockingConnection {
618        use crate::nonblocking::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
619        match self {
620            BaseTpuConnection::Udp(udp_socket) => {
621                UdpTpuConnection::new_from_addr(udp_socket.try_clone().unwrap(), addr).into()
622            }
623            BaseTpuConnection::Quic(quic_client) => {
624                QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
625            }
626        }
627    }
628}
629
630struct GetConnectionResult {
631    connection: Arc<BaseTpuConnection>,
632    cache_hit: bool,
633    report_stats: bool,
634    map_timing_ms: u64,
635    lock_timing_ms: u64,
636    connection_cache_stats: Arc<ConnectionCacheStats>,
637    num_evictions: u64,
638    eviction_timing_ms: u64,
639}
640
641struct CreateConnectionResult {
642    connection: Arc<BaseTpuConnection>,
643    cache_hit: bool,
644    connection_cache_stats: Arc<ConnectionCacheStats>,
645    num_evictions: u64,
646    eviction_timing_ms: u64,
647}
648
649#[cfg(test)]
650mod tests {
651    use {
652        crate::{
653            connection_cache::{ConnectionCache, MAX_CONNECTIONS},
654            tpu_connection::TpuConnection,
655        },
656        rand::{Rng, SeedableRng},
657        rand_chacha::ChaChaRng,
658        solana_sdk::{
659            pubkey::Pubkey,
660            quic::{
661                QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
662                QUIC_PORT_OFFSET,
663            },
664        },
665        solana_streamer::streamer::StakedNodes,
666        std::{
667            net::{IpAddr, Ipv4Addr, SocketAddr},
668            sync::{Arc, RwLock},
669        },
670    };
671
672    fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
673        let a = rng.gen_range(1, 255);
674        let b = rng.gen_range(1, 255);
675        let c = rng.gen_range(1, 255);
676        let d = rng.gen_range(1, 255);
677
678        let addr_str = format!("{}.{}.{}.{}:80", a, b, c, d);
679
680        addr_str.parse().expect("Invalid address")
681    }
682
683    #[test]
684    fn test_connection_cache() {
685        solana_logger::setup();
686        // Allow the test to run deterministically
687        // with the same pseudorandom sequence between runs
688        // and on different platforms - the cryptographic security
689        // property isn't important here but ChaChaRng provides a way
690        // to get the same pseudorandom sequence on different platforms
691        let mut rng = ChaChaRng::seed_from_u64(42);
692
693        // Generate a bunch of random addresses and create TPUConnections to them
694        // Since TPUConnection::new is infallible, it should't matter whether or not
695        // we can actually connect to those addresses - TPUConnection implementations should either
696        // be lazy and not connect until first use or handle connection errors somehow
697        // (without crashing, as would be required in a real practical validator)
698        let connection_cache = ConnectionCache::default();
699        let port_offset = if connection_cache.use_quic() {
700            QUIC_PORT_OFFSET
701        } else {
702            0
703        };
704        let addrs = (0..MAX_CONNECTIONS)
705            .into_iter()
706            .map(|_| {
707                let addr = get_addr(&mut rng);
708                connection_cache.get_connection(&addr);
709                addr
710            })
711            .collect::<Vec<_>>();
712        {
713            let map = connection_cache.map.read().unwrap();
714            assert!(map.len() == MAX_CONNECTIONS);
715            addrs.iter().for_each(|a| {
716                let port = a
717                    .port()
718                    .checked_add(port_offset)
719                    .unwrap_or_else(|| a.port());
720                let addr = &SocketAddr::new(a.ip(), port);
721
722                let conn = &map.get(addr).expect("Address not found").connections[0];
723                let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
724                assert!(addr.ip() == conn.tpu_addr().ip());
725            });
726        }
727
728        let addr = &get_addr(&mut rng);
729        connection_cache.get_connection(addr);
730
731        let port = addr
732            .port()
733            .checked_add(port_offset)
734            .unwrap_or_else(|| addr.port());
735        let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
736        let map = connection_cache.map.read().unwrap();
737        assert!(map.len() == MAX_CONNECTIONS);
738        let _conn = map.get(&addr_with_quic_port).expect("Address not found");
739    }
740
741    #[test]
742    fn test_connection_cache_max_parallel_chunks() {
743        solana_logger::setup();
744        let mut connection_cache = ConnectionCache::default();
745        assert_eq!(
746            connection_cache.compute_max_parallel_streams(),
747            QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
748        );
749
750        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
751        let pubkey = Pubkey::new_unique();
752        connection_cache.set_staked_nodes(&staked_nodes, &pubkey);
753        assert_eq!(
754            connection_cache.compute_max_parallel_streams(),
755            QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
756        );
757
758        staked_nodes.write().unwrap().total_stake = 10000;
759        assert_eq!(
760            connection_cache.compute_max_parallel_streams(),
761            QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
762        );
763
764        staked_nodes
765            .write()
766            .unwrap()
767            .pubkey_stake_map
768            .insert(pubkey, 1);
769        assert_eq!(
770            connection_cache.compute_max_parallel_streams(),
771            QUIC_MIN_STAKED_CONCURRENT_STREAMS
772        );
773
774        staked_nodes
775            .write()
776            .unwrap()
777            .pubkey_stake_map
778            .remove(&pubkey);
779        staked_nodes
780            .write()
781            .unwrap()
782            .pubkey_stake_map
783            .insert(pubkey, 1000);
784        assert_ne!(
785            connection_cache.compute_max_parallel_streams(),
786            QUIC_MIN_STAKED_CONCURRENT_STREAMS
787        );
788    }
789
790    // Test that we can get_connection with a connection cache configured for quic
791    // on an address with a port that, if QUIC_PORT_OFFSET were added to it, it would overflow to
792    // an invalid port.
793    #[test]
794    fn test_overflow_address() {
795        let port = u16::MAX - QUIC_PORT_OFFSET + 1;
796        assert!(port.checked_add(QUIC_PORT_OFFSET).is_none());
797        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
798        let connection_cache = ConnectionCache::new(1);
799
800        let conn = connection_cache.get_connection(&addr);
801        // We (intentionally) don't have an interface that allows us to distinguish between
802        // UDP and Quic connections, so check instead that the port is valid (non-zero)
803        // and is the same as the input port (falling back on UDP)
804        assert!(conn.tpu_addr().port() != 0);
805        assert!(conn.tpu_addr().port() == port);
806    }
807}