1use {
2 crate::{
3 nonblocking::{
4 quic_client::{QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint},
5 tpu_connection::NonblockingConnection,
6 },
7 tpu_connection::{BlockingConnection, ClientStats},
8 },
9 indexmap::map::{Entry, IndexMap},
10 rand::{thread_rng, Rng},
11 safecoin_measure::measure::Measure,
12 solana_sdk::{
13 pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval,
14 },
15 solana_streamer::{
16 nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
17 streamer::StakedNodes,
18 tls_certificates::new_self_signed_tls_certificate_chain,
19 },
20 std::{
21 error::Error,
22 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
23 sync::{
24 atomic::{AtomicU64, Ordering},
25 Arc, RwLock,
26 },
27 },
28};
29
30static MAX_CONNECTIONS: usize = 1024;
32
33pub const DEFAULT_TPU_USE_QUIC: bool = true;
36
37pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
39
40pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
41
42#[derive(Default)]
43pub struct ConnectionCacheStats {
44 cache_hits: AtomicU64,
45 cache_misses: AtomicU64,
46 cache_evictions: AtomicU64,
47 eviction_time_ms: AtomicU64,
48 sent_packets: AtomicU64,
49 total_batches: AtomicU64,
50 batch_success: AtomicU64,
51 batch_failure: AtomicU64,
52 get_connection_ms: AtomicU64,
53 get_connection_lock_ms: AtomicU64,
54 get_connection_hit_ms: AtomicU64,
55 get_connection_miss_ms: AtomicU64,
56
57 pub total_client_stats: ClientStats,
60}
61
62const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
63
64impl ConnectionCacheStats {
65 pub fn add_client_stats(
66 &self,
67 client_stats: &ClientStats,
68 num_packets: usize,
69 is_success: bool,
70 ) {
71 self.total_client_stats.total_connections.fetch_add(
72 client_stats.total_connections.load(Ordering::Relaxed),
73 Ordering::Relaxed,
74 );
75 self.total_client_stats.connection_reuse.fetch_add(
76 client_stats.connection_reuse.load(Ordering::Relaxed),
77 Ordering::Relaxed,
78 );
79 self.total_client_stats.connection_errors.fetch_add(
80 client_stats.connection_errors.load(Ordering::Relaxed),
81 Ordering::Relaxed,
82 );
83 self.total_client_stats.zero_rtt_accepts.fetch_add(
84 client_stats.zero_rtt_accepts.load(Ordering::Relaxed),
85 Ordering::Relaxed,
86 );
87 self.total_client_stats.zero_rtt_rejects.fetch_add(
88 client_stats.zero_rtt_rejects.load(Ordering::Relaxed),
89 Ordering::Relaxed,
90 );
91 self.total_client_stats.make_connection_ms.fetch_add(
92 client_stats.make_connection_ms.load(Ordering::Relaxed),
93 Ordering::Relaxed,
94 );
95 self.total_client_stats.send_timeout.fetch_add(
96 client_stats.send_timeout.load(Ordering::Relaxed),
97 Ordering::Relaxed,
98 );
99 self.sent_packets
100 .fetch_add(num_packets as u64, Ordering::Relaxed);
101 self.total_batches.fetch_add(1, Ordering::Relaxed);
102 if is_success {
103 self.batch_success.fetch_add(1, Ordering::Relaxed);
104 } else {
105 self.batch_failure.fetch_add(1, Ordering::Relaxed);
106 }
107 }
108
109 fn report(&self) {
110 datapoint_info!(
111 "quic-client-connection-stats",
112 (
113 "cache_hits",
114 self.cache_hits.swap(0, Ordering::Relaxed),
115 i64
116 ),
117 (
118 "cache_misses",
119 self.cache_misses.swap(0, Ordering::Relaxed),
120 i64
121 ),
122 (
123 "cache_evictions",
124 self.cache_evictions.swap(0, Ordering::Relaxed),
125 i64
126 ),
127 (
128 "eviction_time_ms",
129 self.eviction_time_ms.swap(0, Ordering::Relaxed),
130 i64
131 ),
132 (
133 "get_connection_ms",
134 self.get_connection_ms.swap(0, Ordering::Relaxed),
135 i64
136 ),
137 (
138 "get_connection_lock_ms",
139 self.get_connection_lock_ms.swap(0, Ordering::Relaxed),
140 i64
141 ),
142 (
143 "get_connection_hit_ms",
144 self.get_connection_hit_ms.swap(0, Ordering::Relaxed),
145 i64
146 ),
147 (
148 "get_connection_miss_ms",
149 self.get_connection_miss_ms.swap(0, Ordering::Relaxed),
150 i64
151 ),
152 (
153 "make_connection_ms",
154 self.total_client_stats
155 .make_connection_ms
156 .swap(0, Ordering::Relaxed),
157 i64
158 ),
159 (
160 "total_connections",
161 self.total_client_stats
162 .total_connections
163 .swap(0, Ordering::Relaxed),
164 i64
165 ),
166 (
167 "connection_reuse",
168 self.total_client_stats
169 .connection_reuse
170 .swap(0, Ordering::Relaxed),
171 i64
172 ),
173 (
174 "connection_errors",
175 self.total_client_stats
176 .connection_errors
177 .swap(0, Ordering::Relaxed),
178 i64
179 ),
180 (
181 "zero_rtt_accepts",
182 self.total_client_stats
183 .zero_rtt_accepts
184 .swap(0, Ordering::Relaxed),
185 i64
186 ),
187 (
188 "zero_rtt_rejects",
189 self.total_client_stats
190 .zero_rtt_rejects
191 .swap(0, Ordering::Relaxed),
192 i64
193 ),
194 (
195 "congestion_events",
196 self.total_client_stats.congestion_events.load_and_reset(),
197 i64
198 ),
199 (
200 "tx_streams_blocked_uni",
201 self.total_client_stats
202 .tx_streams_blocked_uni
203 .load_and_reset(),
204 i64
205 ),
206 (
207 "tx_data_blocked",
208 self.total_client_stats.tx_data_blocked.load_and_reset(),
209 i64
210 ),
211 (
212 "tx_acks",
213 self.total_client_stats.tx_acks.load_and_reset(),
214 i64
215 ),
216 (
217 "num_packets",
218 self.sent_packets.swap(0, Ordering::Relaxed),
219 i64
220 ),
221 (
222 "total_batches",
223 self.total_batches.swap(0, Ordering::Relaxed),
224 i64
225 ),
226 (
227 "batch_failure",
228 self.batch_failure.swap(0, Ordering::Relaxed),
229 i64
230 ),
231 (
232 "send_timeout",
233 self.total_client_stats
234 .send_timeout
235 .swap(0, Ordering::Relaxed),
236 i64
237 ),
238 );
239 }
240}
241
242pub struct ConnectionCache {
243 map: RwLock<IndexMap<SocketAddr, ConnectionPool>>,
244 stats: Arc<ConnectionCacheStats>,
245 last_stats: AtomicInterval,
246 connection_pool_size: usize,
247 tpu_udp_socket: Arc<UdpSocket>,
248 client_certificate: Arc<QuicClientCertificate>,
249 use_quic: bool,
250 maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
251 maybe_client_pubkey: Option<Pubkey>,
252}
253
254struct ConnectionPool {
256 connections: Vec<Arc<BaseTpuConnection>>,
258
259 endpoint: Option<Arc<QuicLazyInitializedEndpoint>>,
261}
262
263impl ConnectionPool {
264 fn borrow_connection(&self) -> Arc<BaseTpuConnection> {
267 let mut rng = thread_rng();
268 let n = rng.gen_range(0, self.connections.len());
269 self.connections[n].clone()
270 }
271
272 fn need_new_connection(&self, required_pool_size: usize) -> bool {
275 self.connections.len() < required_pool_size
276 }
277}
278
279impl ConnectionCache {
280 pub fn new(connection_pool_size: usize) -> Self {
281 let connection_pool_size = 1.max(connection_pool_size);
283 Self {
284 use_quic: true,
285 connection_pool_size,
286 ..Self::default()
287 }
288 }
289
290 pub fn update_client_certificate(
291 &mut self,
292 keypair: &Keypair,
293 ipaddr: IpAddr,
294 ) -> Result<(), Box<dyn Error>> {
295 let (certs, priv_key) = new_self_signed_tls_certificate_chain(keypair, ipaddr)?;
296 self.client_certificate = Arc::new(QuicClientCertificate {
297 certificates: certs,
298 key: priv_key,
299 });
300 Ok(())
301 }
302
303 pub fn set_staked_nodes(
304 &mut self,
305 staked_nodes: &Arc<RwLock<StakedNodes>>,
306 client_pubkey: &Pubkey,
307 ) {
308 self.maybe_staked_nodes = Some(staked_nodes.clone());
309 self.maybe_client_pubkey = Some(*client_pubkey);
310 }
311
312 pub fn with_udp(connection_pool_size: usize) -> Self {
313 let connection_pool_size = 1.max(connection_pool_size);
315 Self {
316 use_quic: false,
317 connection_pool_size,
318 ..Self::default()
319 }
320 }
321
322 pub fn use_quic(&self) -> bool {
323 self.use_quic
324 }
325
326 fn create_endpoint(&self, force_use_udp: bool) -> Option<Arc<QuicLazyInitializedEndpoint>> {
327 if self.use_quic() && !force_use_udp {
328 Some(Arc::new(QuicLazyInitializedEndpoint::new(
329 self.client_certificate.clone(),
330 )))
331 } else {
332 None
333 }
334 }
335
336 fn compute_max_parallel_streams(&self) -> usize {
337 let (client_type, stake, total_stake) =
338 self.maybe_client_pubkey
339 .map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| {
340 self.maybe_staked_nodes.as_ref().map_or(
341 (ConnectionPeerType::Unstaked, 0, 0),
342 |stakes| {
343 let rstakes = stakes.read().unwrap();
344 rstakes.pubkey_stake_map.get(&pubkey).map_or(
345 (ConnectionPeerType::Unstaked, 0, rstakes.total_stake),
346 |stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake),
347 )
348 },
349 )
350 });
351 compute_max_allowed_uni_streams(client_type, stake, total_stake)
352 }
353
354 fn create_connection(
358 &self,
359 lock_timing_ms: &mut u64,
360 addr: &SocketAddr,
361 force_use_udp: bool,
362 ) -> CreateConnectionResult {
363 let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
364 let mut map = self.map.write().unwrap();
365 get_connection_map_lock_measure.stop();
366 *lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
367 let (to_create_connection, endpoint) =
371 map.get(addr)
372 .map_or((true, self.create_endpoint(force_use_udp)), |pool| {
373 (
374 pool.need_new_connection(self.connection_pool_size),
375 pool.endpoint.clone(),
376 )
377 });
378
379 let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection {
380 let connection = if !self.use_quic() || force_use_udp {
381 BaseTpuConnection::Udp(self.tpu_udp_socket.clone())
382 } else {
383 BaseTpuConnection::Quic(Arc::new(QuicClient::new(
384 endpoint.as_ref().unwrap().clone(),
385 *addr,
386 self.compute_max_parallel_streams(),
387 )))
388 };
389
390 let connection = Arc::new(connection);
391
392 let mut num_evictions = 0;
394 let mut get_connection_cache_eviction_measure =
395 Measure::start("get_connection_cache_eviction_measure");
396 while map.len() >= MAX_CONNECTIONS {
397 let mut rng = thread_rng();
398 let n = rng.gen_range(0, MAX_CONNECTIONS);
399 map.swap_remove_index(n);
400 num_evictions += 1;
401 }
402 get_connection_cache_eviction_measure.stop();
403
404 match map.entry(*addr) {
405 Entry::Occupied(mut entry) => {
406 let pool = entry.get_mut();
407 pool.connections.push(connection);
408 }
409 Entry::Vacant(entry) => {
410 entry.insert(ConnectionPool {
411 connections: vec![connection],
412 endpoint,
413 });
414 }
415 }
416 (
417 false,
418 num_evictions,
419 get_connection_cache_eviction_measure.as_ms(),
420 )
421 } else {
422 (true, 0, 0)
423 };
424
425 let pool = map.get(addr).unwrap();
426 let connection = pool.borrow_connection();
427
428 CreateConnectionResult {
429 connection,
430 cache_hit,
431 connection_cache_stats: self.stats.clone(),
432 num_evictions,
433 eviction_timing_ms,
434 }
435 }
436
437 fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult {
438 let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
439 let map = self.map.read().unwrap();
440 get_connection_map_lock_measure.stop();
441
442 let port_offset = if self.use_quic() { QUIC_PORT_OFFSET } else { 0 };
443
444 let port = addr
445 .port()
446 .checked_add(port_offset)
447 .unwrap_or_else(|| addr.port());
448 let force_use_udp = port == addr.port();
449 let addr = SocketAddr::new(addr.ip(), port);
450
451 let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
452
453 let report_stats = self
454 .last_stats
455 .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
456
457 let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
458 let CreateConnectionResult {
459 connection,
460 cache_hit,
461 connection_cache_stats,
462 num_evictions,
463 eviction_timing_ms,
464 } = match map.get(&addr) {
465 Some(pool) => {
466 if pool.need_new_connection(self.connection_pool_size) {
467 drop(map);
469 self.create_connection(&mut lock_timing_ms, &addr, force_use_udp)
470 } else {
471 let connection = pool.borrow_connection();
472 CreateConnectionResult {
473 connection,
474 cache_hit: true,
475 connection_cache_stats: self.stats.clone(),
476 num_evictions: 0,
477 eviction_timing_ms: 0,
478 }
479 }
480 }
481 None => {
482 drop(map);
484 self.create_connection(&mut lock_timing_ms, &addr, force_use_udp)
485 }
486 };
487 get_connection_map_measure.stop();
488
489 GetConnectionResult {
490 connection,
491 cache_hit,
492 report_stats,
493 map_timing_ms: get_connection_map_measure.as_ms(),
494 lock_timing_ms,
495 connection_cache_stats,
496 num_evictions,
497 eviction_timing_ms,
498 }
499 }
500
501 fn get_connection_and_log_stats(
502 &self,
503 addr: &SocketAddr,
504 ) -> (Arc<BaseTpuConnection>, Arc<ConnectionCacheStats>) {
505 let mut get_connection_measure = Measure::start("get_connection_measure");
506 let GetConnectionResult {
507 connection,
508 cache_hit,
509 report_stats,
510 map_timing_ms,
511 lock_timing_ms,
512 connection_cache_stats,
513 num_evictions,
514 eviction_timing_ms,
515 } = self.get_or_add_connection(addr);
516
517 if report_stats {
518 connection_cache_stats.report();
519 }
520
521 if cache_hit {
522 connection_cache_stats
523 .cache_hits
524 .fetch_add(1, Ordering::Relaxed);
525 connection_cache_stats
526 .get_connection_hit_ms
527 .fetch_add(map_timing_ms, Ordering::Relaxed);
528 } else {
529 connection_cache_stats
530 .cache_misses
531 .fetch_add(1, Ordering::Relaxed);
532 connection_cache_stats
533 .get_connection_miss_ms
534 .fetch_add(map_timing_ms, Ordering::Relaxed);
535 connection_cache_stats
536 .cache_evictions
537 .fetch_add(num_evictions, Ordering::Relaxed);
538 connection_cache_stats
539 .eviction_time_ms
540 .fetch_add(eviction_timing_ms, Ordering::Relaxed);
541 }
542
543 get_connection_measure.stop();
544 connection_cache_stats
545 .get_connection_lock_ms
546 .fetch_add(lock_timing_ms, Ordering::Relaxed);
547 connection_cache_stats
548 .get_connection_ms
549 .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
550
551 (connection, connection_cache_stats)
552 }
553
554 pub fn get_connection(&self, addr: &SocketAddr) -> BlockingConnection {
555 let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
556 connection.new_blocking_connection(*addr, connection_cache_stats)
557 }
558
559 pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingConnection {
560 let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
561 connection.new_nonblocking_connection(*addr, connection_cache_stats)
562 }
563}
564
565impl Default for ConnectionCache {
566 fn default() -> Self {
567 let (certs, priv_key) = new_self_signed_tls_certificate_chain(
568 &Keypair::new(),
569 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
570 )
571 .expect("Failed to initialize QUIC client certificates");
572 Self {
573 map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
574 stats: Arc::new(ConnectionCacheStats::default()),
575 last_stats: AtomicInterval::default(),
576 connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
577 tpu_udp_socket: Arc::new(
578 safecoin_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
579 .expect("Unable to bind to UDP socket"),
580 ),
581 client_certificate: Arc::new(QuicClientCertificate {
582 certificates: certs,
583 key: priv_key,
584 }),
585 use_quic: DEFAULT_TPU_USE_QUIC,
586 maybe_staked_nodes: None,
587 maybe_client_pubkey: None,
588 }
589 }
590}
591
592enum BaseTpuConnection {
593 Udp(Arc<UdpSocket>),
594 Quic(Arc<QuicClient>),
595}
596impl BaseTpuConnection {
597 fn new_blocking_connection(
598 &self,
599 addr: SocketAddr,
600 stats: Arc<ConnectionCacheStats>,
601 ) -> BlockingConnection {
602 use crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
603 match self {
604 BaseTpuConnection::Udp(udp_socket) => {
605 UdpTpuConnection::new_from_addr(udp_socket.clone(), addr).into()
606 }
607 BaseTpuConnection::Quic(quic_client) => {
608 QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
609 }
610 }
611 }
612
613 fn new_nonblocking_connection(
614 &self,
615 addr: SocketAddr,
616 stats: Arc<ConnectionCacheStats>,
617 ) -> NonblockingConnection {
618 use crate::nonblocking::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
619 match self {
620 BaseTpuConnection::Udp(udp_socket) => {
621 UdpTpuConnection::new_from_addr(udp_socket.try_clone().unwrap(), addr).into()
622 }
623 BaseTpuConnection::Quic(quic_client) => {
624 QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
625 }
626 }
627 }
628}
629
630struct GetConnectionResult {
631 connection: Arc<BaseTpuConnection>,
632 cache_hit: bool,
633 report_stats: bool,
634 map_timing_ms: u64,
635 lock_timing_ms: u64,
636 connection_cache_stats: Arc<ConnectionCacheStats>,
637 num_evictions: u64,
638 eviction_timing_ms: u64,
639}
640
641struct CreateConnectionResult {
642 connection: Arc<BaseTpuConnection>,
643 cache_hit: bool,
644 connection_cache_stats: Arc<ConnectionCacheStats>,
645 num_evictions: u64,
646 eviction_timing_ms: u64,
647}
648
649#[cfg(test)]
650mod tests {
651 use {
652 crate::{
653 connection_cache::{ConnectionCache, MAX_CONNECTIONS},
654 tpu_connection::TpuConnection,
655 },
656 rand::{Rng, SeedableRng},
657 rand_chacha::ChaChaRng,
658 solana_sdk::{
659 pubkey::Pubkey,
660 quic::{
661 QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
662 QUIC_PORT_OFFSET,
663 },
664 },
665 solana_streamer::streamer::StakedNodes,
666 std::{
667 net::{IpAddr, Ipv4Addr, SocketAddr},
668 sync::{Arc, RwLock},
669 },
670 };
671
672 fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
673 let a = rng.gen_range(1, 255);
674 let b = rng.gen_range(1, 255);
675 let c = rng.gen_range(1, 255);
676 let d = rng.gen_range(1, 255);
677
678 let addr_str = format!("{}.{}.{}.{}:80", a, b, c, d);
679
680 addr_str.parse().expect("Invalid address")
681 }
682
683 #[test]
684 fn test_connection_cache() {
685 solana_logger::setup();
686 let mut rng = ChaChaRng::seed_from_u64(42);
692
693 let connection_cache = ConnectionCache::default();
699 let port_offset = if connection_cache.use_quic() {
700 QUIC_PORT_OFFSET
701 } else {
702 0
703 };
704 let addrs = (0..MAX_CONNECTIONS)
705 .into_iter()
706 .map(|_| {
707 let addr = get_addr(&mut rng);
708 connection_cache.get_connection(&addr);
709 addr
710 })
711 .collect::<Vec<_>>();
712 {
713 let map = connection_cache.map.read().unwrap();
714 assert!(map.len() == MAX_CONNECTIONS);
715 addrs.iter().for_each(|a| {
716 let port = a
717 .port()
718 .checked_add(port_offset)
719 .unwrap_or_else(|| a.port());
720 let addr = &SocketAddr::new(a.ip(), port);
721
722 let conn = &map.get(addr).expect("Address not found").connections[0];
723 let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
724 assert!(addr.ip() == conn.tpu_addr().ip());
725 });
726 }
727
728 let addr = &get_addr(&mut rng);
729 connection_cache.get_connection(addr);
730
731 let port = addr
732 .port()
733 .checked_add(port_offset)
734 .unwrap_or_else(|| addr.port());
735 let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
736 let map = connection_cache.map.read().unwrap();
737 assert!(map.len() == MAX_CONNECTIONS);
738 let _conn = map.get(&addr_with_quic_port).expect("Address not found");
739 }
740
741 #[test]
742 fn test_connection_cache_max_parallel_chunks() {
743 solana_logger::setup();
744 let mut connection_cache = ConnectionCache::default();
745 assert_eq!(
746 connection_cache.compute_max_parallel_streams(),
747 QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
748 );
749
750 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
751 let pubkey = Pubkey::new_unique();
752 connection_cache.set_staked_nodes(&staked_nodes, &pubkey);
753 assert_eq!(
754 connection_cache.compute_max_parallel_streams(),
755 QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
756 );
757
758 staked_nodes.write().unwrap().total_stake = 10000;
759 assert_eq!(
760 connection_cache.compute_max_parallel_streams(),
761 QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
762 );
763
764 staked_nodes
765 .write()
766 .unwrap()
767 .pubkey_stake_map
768 .insert(pubkey, 1);
769 assert_eq!(
770 connection_cache.compute_max_parallel_streams(),
771 QUIC_MIN_STAKED_CONCURRENT_STREAMS
772 );
773
774 staked_nodes
775 .write()
776 .unwrap()
777 .pubkey_stake_map
778 .remove(&pubkey);
779 staked_nodes
780 .write()
781 .unwrap()
782 .pubkey_stake_map
783 .insert(pubkey, 1000);
784 assert_ne!(
785 connection_cache.compute_max_parallel_streams(),
786 QUIC_MIN_STAKED_CONCURRENT_STREAMS
787 );
788 }
789
790 #[test]
794 fn test_overflow_address() {
795 let port = u16::MAX - QUIC_PORT_OFFSET + 1;
796 assert!(port.checked_add(QUIC_PORT_OFFSET).is_none());
797 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
798 let connection_cache = ConnectionCache::new(1);
799
800 let conn = connection_cache.get_connection(&addr);
801 assert!(conn.tpu_addr().port() != 0);
805 assert!(conn.tpu_addr().port() == port);
806 }
807}