1use {
2 crate::{
3 nonblocking::quic::ALPN_TPU_PROTOCOL_ID, streamer::StakedNodes,
4 tls_certificates::new_dummy_x509_certificate,
5 },
6 crossbeam_channel::Sender,
7 pem::Pem,
8 quinn::{
9 crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
10 Endpoint, IdleTimeout, ServerConfig,
11 },
12 rustls::{
13 pki_types::{CertificateDer, UnixTime},
14 server::danger::ClientCertVerified,
15 DistinguishedName, KeyLogFile,
16 },
17 solana_perf::packet::PacketBatch,
18 solana_sdk::{
19 packet::PACKET_DATA_SIZE,
20 quic::{NotifyKeyUpdate, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
21 signature::Keypair,
22 },
23 std::{
24 net::UdpSocket,
25 sync::{
26 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
27 Arc, Mutex, RwLock,
28 },
29 thread,
30 time::Duration,
31 },
32 tokio::runtime::Runtime,
33};
34
35pub const MAX_STAKED_CONNECTIONS: usize = 2000;
36pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
37
38pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
40
41#[derive(Debug)]
42pub struct SkipClientVerification(Arc<rustls::crypto::CryptoProvider>);
43
44impl SkipClientVerification {
45 pub fn new() -> Arc<Self> {
46 Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider())))
47 }
48}
49
50pub struct SpawnServerResult {
51 pub endpoints: Vec<Endpoint>,
52 pub thread: thread::JoinHandle<()>,
53 pub key_updater: Arc<EndpointKeyUpdater>,
54}
55
56impl rustls::server::danger::ClientCertVerifier for SkipClientVerification {
57 fn verify_client_cert(
58 &self,
59 _end_entity: &CertificateDer,
60 _intermediates: &[CertificateDer],
61 _now: UnixTime,
62 ) -> Result<ClientCertVerified, rustls::Error> {
63 Ok(rustls::server::danger::ClientCertVerified::assertion())
64 }
65
66 fn root_hint_subjects(&self) -> &[DistinguishedName] {
67 &[]
68 }
69
70 fn verify_tls12_signature(
71 &self,
72 message: &[u8],
73 cert: &rustls::pki_types::CertificateDer<'_>,
74 dss: &rustls::DigitallySignedStruct,
75 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
76 rustls::crypto::verify_tls12_signature(
77 message,
78 cert,
79 dss,
80 &self.0.signature_verification_algorithms,
81 )
82 }
83
84 fn verify_tls13_signature(
85 &self,
86 message: &[u8],
87 cert: &rustls::pki_types::CertificateDer<'_>,
88 dss: &rustls::DigitallySignedStruct,
89 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
90 rustls::crypto::verify_tls13_signature(
91 message,
92 cert,
93 dss,
94 &self.0.signature_verification_algorithms,
95 )
96 }
97
98 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
99 self.0.signature_verification_algorithms.supported_schemes()
100 }
101
102 fn offer_client_auth(&self) -> bool {
103 true
104 }
105
106 fn client_auth_mandatory(&self) -> bool {
107 self.offer_client_auth()
108 }
109}
110
111#[allow(clippy::field_reassign_with_default)] pub(crate) fn configure_server(
114 identity_keypair: &Keypair,
115) -> Result<(ServerConfig, String), QuicServerError> {
116 let (cert, priv_key) = new_dummy_x509_certificate(identity_keypair);
117 let cert_chain_pem_parts = vec![Pem {
118 tag: "CERTIFICATE".to_string(),
119 contents: cert.as_ref().to_vec(),
120 }];
121 let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
122
123 let mut server_tls_config = rustls::ServerConfig::builder()
124 .with_client_cert_verifier(SkipClientVerification::new())
125 .with_single_cert(vec![cert], priv_key)?;
126 server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
127 server_tls_config.key_log = Arc::new(KeyLogFile::new());
128 let quic_server_config = QuicServerConfig::try_from(server_tls_config)?;
129
130 let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
131 let config = Arc::get_mut(&mut server_config.transport).unwrap();
132
133 const MAX_CONCURRENT_UNI_STREAMS: u32 =
135 (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS.saturating_mul(2)) as u32;
136 config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
137 config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
138 config.receive_window((PACKET_DATA_SIZE as u32).into());
139 let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
140 config.max_idle_timeout(Some(timeout));
141
142 const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
144 config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
145 config.datagram_receive_buffer_size(None);
146
147 config.enable_segmentation_offload(false);
152
153 Ok((server_config, cert_chain_pem))
154}
155
156pub fn rt(name: String) -> Runtime {
157 tokio::runtime::Builder::new_multi_thread()
158 .thread_name(name)
159 .enable_all()
160 .build()
161 .unwrap()
162}
163
164#[derive(thiserror::Error, Debug)]
165pub enum QuicServerError {
166 #[error("Endpoint creation failed: {0}")]
167 EndpointFailed(std::io::Error),
168 #[error("TLS error: {0}")]
169 TlsError(#[from] rustls::Error),
170 #[error("No initial cipher suite")]
171 NoInitialCipherSuite(#[from] NoInitialCipherSuite),
172}
173
174pub struct EndpointKeyUpdater {
175 endpoints: Vec<Endpoint>,
176}
177
178impl NotifyKeyUpdate for EndpointKeyUpdater {
179 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
180 let (config, _) = configure_server(key)?;
181 for endpoint in &self.endpoints {
182 endpoint.set_server_config(Some(config.clone()));
183 }
184 Ok(())
185 }
186}
187
188#[derive(Default)]
189pub struct StreamerStats {
190 pub(crate) total_connections: AtomicUsize,
191 pub(crate) total_new_connections: AtomicUsize,
192 pub(crate) total_streams: AtomicUsize,
193 pub(crate) total_new_streams: AtomicUsize,
194 pub(crate) invalid_stream_size: AtomicUsize,
195 pub(crate) total_packets_allocated: AtomicUsize,
196 pub(crate) total_packet_batches_allocated: AtomicUsize,
197 pub(crate) total_chunks_received: AtomicUsize,
198 pub(crate) total_staked_chunks_received: AtomicUsize,
199 pub(crate) total_unstaked_chunks_received: AtomicUsize,
200 pub(crate) total_packet_batch_send_err: AtomicUsize,
201 pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
202 pub(crate) total_packet_batches_sent: AtomicUsize,
203 pub(crate) total_packet_batches_none: AtomicUsize,
204 pub(crate) total_packets_sent_for_batching: AtomicUsize,
205 pub(crate) total_bytes_sent_for_batching: AtomicUsize,
206 pub(crate) total_chunks_sent_for_batching: AtomicUsize,
207 pub(crate) total_packets_sent_to_consumer: AtomicUsize,
208 pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
209 pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
210 pub(crate) total_stream_read_errors: AtomicUsize,
211 pub(crate) total_stream_read_timeouts: AtomicUsize,
212 pub(crate) num_evictions: AtomicUsize,
213 pub(crate) connection_added_from_staked_peer: AtomicUsize,
214 pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
215 pub(crate) connection_add_failed: AtomicUsize,
216 pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
217 pub(crate) connection_add_failed_staked_node: AtomicUsize,
218 pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
219 pub(crate) connection_add_failed_on_pruning: AtomicUsize,
220 pub(crate) connection_setup_timeout: AtomicUsize,
221 pub(crate) connection_setup_error: AtomicUsize,
222 pub(crate) connection_setup_error_closed: AtomicUsize,
223 pub(crate) connection_setup_error_timed_out: AtomicUsize,
224 pub(crate) connection_setup_error_transport: AtomicUsize,
225 pub(crate) connection_setup_error_app_closed: AtomicUsize,
226 pub(crate) connection_setup_error_reset: AtomicUsize,
227 pub(crate) connection_setup_error_locally_closed: AtomicUsize,
228 pub(crate) connection_removed: AtomicUsize,
229 pub(crate) connection_remove_failed: AtomicUsize,
230 pub(crate) connection_rate_limited_across_all: AtomicUsize,
233 pub(crate) connection_rate_limited_per_ipaddr: AtomicUsize,
236 pub(crate) throttled_streams: AtomicUsize,
237 pub(crate) stream_load_ema: AtomicUsize,
238 pub(crate) stream_load_ema_overflow: AtomicUsize,
239 pub(crate) stream_load_capacity_overflow: AtomicUsize,
240 pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
241 pub(crate) perf_track_overhead_us: AtomicU64,
242 pub(crate) total_staked_packets_sent_for_batching: AtomicUsize,
243 pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize,
244 pub(crate) throttled_staked_streams: AtomicUsize,
245 pub(crate) throttled_unstaked_streams: AtomicUsize,
246 pub(crate) connection_rate_limiter_length: AtomicUsize,
247 pub(crate) open_connections: AtomicUsize,
249 pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
250 pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
251 pub(crate) total_incoming_connection_attempts: AtomicUsize,
252 pub(crate) quic_endpoints_count: AtomicUsize,
253}
254
255impl StreamerStats {
256 pub fn report(&self, name: &'static str) {
257 let process_sampled_packets_us_hist = {
258 let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
259 let process_sampled_packets_us_hist = metrics.clone();
260 metrics.clear();
261 process_sampled_packets_us_hist
262 };
263
264 datapoint_info!(
265 name,
266 (
267 "active_connections",
268 self.total_connections.load(Ordering::Relaxed),
269 i64
270 ),
271 (
272 "active_streams",
273 self.total_streams.load(Ordering::Relaxed),
274 i64
275 ),
276 (
277 "new_connections",
278 self.total_new_connections.swap(0, Ordering::Relaxed),
279 i64
280 ),
281 (
282 "new_streams",
283 self.total_new_streams.swap(0, Ordering::Relaxed),
284 i64
285 ),
286 (
287 "evictions",
288 self.num_evictions.swap(0, Ordering::Relaxed),
289 i64
290 ),
291 (
292 "connection_added_from_staked_peer",
293 self.connection_added_from_staked_peer
294 .swap(0, Ordering::Relaxed),
295 i64
296 ),
297 (
298 "connection_added_from_unstaked_peer",
299 self.connection_added_from_unstaked_peer
300 .swap(0, Ordering::Relaxed),
301 i64
302 ),
303 (
304 "connection_add_failed",
305 self.connection_add_failed.swap(0, Ordering::Relaxed),
306 i64
307 ),
308 (
309 "connection_add_failed_invalid_stream_count",
310 self.connection_add_failed_invalid_stream_count
311 .swap(0, Ordering::Relaxed),
312 i64
313 ),
314 (
315 "connection_add_failed_staked_node",
316 self.connection_add_failed_staked_node
317 .swap(0, Ordering::Relaxed),
318 i64
319 ),
320 (
321 "connection_add_failed_unstaked_node",
322 self.connection_add_failed_unstaked_node
323 .swap(0, Ordering::Relaxed),
324 i64
325 ),
326 (
327 "connection_add_failed_on_pruning",
328 self.connection_add_failed_on_pruning
329 .swap(0, Ordering::Relaxed),
330 i64
331 ),
332 (
333 "connection_removed",
334 self.connection_removed.swap(0, Ordering::Relaxed),
335 i64
336 ),
337 (
338 "connection_remove_failed",
339 self.connection_remove_failed.swap(0, Ordering::Relaxed),
340 i64
341 ),
342 (
343 "connection_setup_timeout",
344 self.connection_setup_timeout.swap(0, Ordering::Relaxed),
345 i64
346 ),
347 (
348 "connection_setup_error",
349 self.connection_setup_error.swap(0, Ordering::Relaxed),
350 i64
351 ),
352 (
353 "connection_setup_error_timed_out",
354 self.connection_setup_error_timed_out
355 .swap(0, Ordering::Relaxed),
356 i64
357 ),
358 (
359 "connection_setup_error_closed",
360 self.connection_setup_error_closed
361 .swap(0, Ordering::Relaxed),
362 i64
363 ),
364 (
365 "connection_setup_error_transport",
366 self.connection_setup_error_transport
367 .swap(0, Ordering::Relaxed),
368 i64
369 ),
370 (
371 "connection_setup_error_app_closed",
372 self.connection_setup_error_app_closed
373 .swap(0, Ordering::Relaxed),
374 i64
375 ),
376 (
377 "connection_setup_error_reset",
378 self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
379 i64
380 ),
381 (
382 "connection_setup_error_locally_closed",
383 self.connection_setup_error_locally_closed
384 .swap(0, Ordering::Relaxed),
385 i64
386 ),
387 (
388 "connection_rate_limited_across_all",
389 self.connection_rate_limited_across_all
390 .swap(0, Ordering::Relaxed),
391 i64
392 ),
393 (
394 "connection_rate_limited_per_ipaddr",
395 self.connection_rate_limited_per_ipaddr
396 .swap(0, Ordering::Relaxed),
397 i64
398 ),
399 (
400 "invalid_stream_size",
401 self.invalid_stream_size.swap(0, Ordering::Relaxed),
402 i64
403 ),
404 (
405 "packets_allocated",
406 self.total_packets_allocated.swap(0, Ordering::Relaxed),
407 i64
408 ),
409 (
410 "packet_batches_allocated",
411 self.total_packet_batches_allocated
412 .swap(0, Ordering::Relaxed),
413 i64
414 ),
415 (
416 "packets_sent_for_batching",
417 self.total_packets_sent_for_batching
418 .swap(0, Ordering::Relaxed),
419 i64
420 ),
421 (
422 "staked_packets_sent_for_batching",
423 self.total_staked_packets_sent_for_batching
424 .swap(0, Ordering::Relaxed),
425 i64
426 ),
427 (
428 "unstaked_packets_sent_for_batching",
429 self.total_unstaked_packets_sent_for_batching
430 .swap(0, Ordering::Relaxed),
431 i64
432 ),
433 (
434 "bytes_sent_for_batching",
435 self.total_bytes_sent_for_batching
436 .swap(0, Ordering::Relaxed),
437 i64
438 ),
439 (
440 "chunks_sent_for_batching",
441 self.total_chunks_sent_for_batching
442 .swap(0, Ordering::Relaxed),
443 i64
444 ),
445 (
446 "packets_sent_to_consumer",
447 self.total_packets_sent_to_consumer
448 .swap(0, Ordering::Relaxed),
449 i64
450 ),
451 (
452 "bytes_sent_to_consumer",
453 self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
454 i64
455 ),
456 (
457 "chunks_processed_by_batcher",
458 self.total_chunks_processed_by_batcher
459 .swap(0, Ordering::Relaxed),
460 i64
461 ),
462 (
463 "chunks_received",
464 self.total_chunks_received.swap(0, Ordering::Relaxed),
465 i64
466 ),
467 (
468 "staked_chunks_received",
469 self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
470 i64
471 ),
472 (
473 "unstaked_chunks_received",
474 self.total_unstaked_chunks_received
475 .swap(0, Ordering::Relaxed),
476 i64
477 ),
478 (
479 "packet_batch_send_error",
480 self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
481 i64
482 ),
483 (
484 "handle_chunk_to_packet_batcher_send_error",
485 self.total_handle_chunk_to_packet_batcher_send_err
486 .swap(0, Ordering::Relaxed),
487 i64
488 ),
489 (
490 "packet_batches_sent",
491 self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
492 i64
493 ),
494 (
495 "packet_batch_empty",
496 self.total_packet_batches_none.swap(0, Ordering::Relaxed),
497 i64
498 ),
499 (
500 "stream_read_errors",
501 self.total_stream_read_errors.swap(0, Ordering::Relaxed),
502 i64
503 ),
504 (
505 "stream_read_timeouts",
506 self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
507 i64
508 ),
509 (
510 "throttled_streams",
511 self.throttled_streams.swap(0, Ordering::Relaxed),
512 i64
513 ),
514 (
515 "stream_load_ema",
516 self.stream_load_ema.load(Ordering::Relaxed),
517 i64
518 ),
519 (
520 "stream_load_ema_overflow",
521 self.stream_load_ema_overflow.load(Ordering::Relaxed),
522 i64
523 ),
524 (
525 "stream_load_capacity_overflow",
526 self.stream_load_capacity_overflow.load(Ordering::Relaxed),
527 i64
528 ),
529 (
530 "throttled_unstaked_streams",
531 self.throttled_unstaked_streams.swap(0, Ordering::Relaxed),
532 i64
533 ),
534 (
535 "throttled_staked_streams",
536 self.throttled_staked_streams.swap(0, Ordering::Relaxed),
537 i64
538 ),
539 (
540 "process_sampled_packets_us_90pct",
541 process_sampled_packets_us_hist
542 .percentile(90.0)
543 .unwrap_or(0),
544 i64
545 ),
546 (
547 "process_sampled_packets_us_min",
548 process_sampled_packets_us_hist.minimum().unwrap_or(0),
549 i64
550 ),
551 (
552 "process_sampled_packets_us_max",
553 process_sampled_packets_us_hist.maximum().unwrap_or(0),
554 i64
555 ),
556 (
557 "process_sampled_packets_us_mean",
558 process_sampled_packets_us_hist.mean().unwrap_or(0),
559 i64
560 ),
561 (
562 "process_sampled_packets_count",
563 process_sampled_packets_us_hist.entries(),
564 i64
565 ),
566 (
567 "perf_track_overhead_us",
568 self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
569 i64
570 ),
571 (
572 "connection_rate_limiter_length",
573 self.connection_rate_limiter_length.load(Ordering::Relaxed),
574 i64
575 ),
576 (
577 "outstanding_incoming_connection_attempts",
578 self.outstanding_incoming_connection_attempts
579 .load(Ordering::Relaxed),
580 i64
581 ),
582 (
583 "total_incoming_connection_attempts",
584 self.total_incoming_connection_attempts
585 .load(Ordering::Relaxed),
586 i64
587 ),
588 (
589 "quic_endpoints_count",
590 self.quic_endpoints_count.load(Ordering::Relaxed),
591 i64
592 ),
593 (
594 "open_connections",
595 self.open_connections.load(Ordering::Relaxed),
596 i64
597 ),
598 (
599 "refused_connections_too_many_open_connections",
600 self.refused_connections_too_many_open_connections
601 .swap(0, Ordering::Relaxed),
602 i64
603 ),
604 );
605 }
606}
607
608#[allow(clippy::too_many_arguments)]
609pub fn spawn_server(
610 thread_name: &'static str,
611 metrics_name: &'static str,
612 socket: UdpSocket,
613 keypair: &Keypair,
614 packet_sender: Sender<PacketBatch>,
615 exit: Arc<AtomicBool>,
616 max_connections_per_peer: usize,
617 staked_nodes: Arc<RwLock<StakedNodes>>,
618 max_staked_connections: usize,
619 max_unstaked_connections: usize,
620 max_streams_per_ms: u64,
621 max_connections_per_ipaddr_per_min: u64,
622 wait_for_chunk_timeout: Duration,
623 coalesce: Duration,
624) -> Result<SpawnServerResult, QuicServerError> {
625 spawn_server_multi(
626 thread_name,
627 metrics_name,
628 vec![socket],
629 keypair,
630 packet_sender,
631 exit,
632 max_connections_per_peer,
633 staked_nodes,
634 max_staked_connections,
635 max_unstaked_connections,
636 max_streams_per_ms,
637 max_connections_per_ipaddr_per_min,
638 wait_for_chunk_timeout,
639 coalesce,
640 )
641}
642
643#[allow(clippy::too_many_arguments)]
644pub fn spawn_server_multi(
645 thread_name: &'static str,
646 metrics_name: &'static str,
647 sockets: Vec<UdpSocket>,
648 keypair: &Keypair,
649 packet_sender: Sender<PacketBatch>,
650 exit: Arc<AtomicBool>,
651 max_connections_per_peer: usize,
652 staked_nodes: Arc<RwLock<StakedNodes>>,
653 max_staked_connections: usize,
654 max_unstaked_connections: usize,
655 max_streams_per_ms: u64,
656 max_connections_per_ipaddr_per_min: u64,
657 wait_for_chunk_timeout: Duration,
658 coalesce: Duration,
659) -> Result<SpawnServerResult, QuicServerError> {
660 let runtime = rt(format!("{thread_name}Rt"));
661 let result = {
662 let _guard = runtime.enter();
663 crate::nonblocking::quic::spawn_server_multi(
664 metrics_name,
665 sockets,
666 keypair,
667 packet_sender,
668 exit,
669 max_connections_per_peer,
670 staked_nodes,
671 max_staked_connections,
672 max_unstaked_connections,
673 max_streams_per_ms,
674 max_connections_per_ipaddr_per_min,
675 wait_for_chunk_timeout,
676 coalesce,
677 )
678 }?;
679 let handle = thread::Builder::new()
680 .name(thread_name.into())
681 .spawn(move || {
682 if let Err(e) = runtime.block_on(result.thread) {
683 warn!("error from runtime.block_on: {:?}", e);
684 }
685 })
686 .unwrap();
687 let updater = EndpointKeyUpdater {
688 endpoints: result.endpoints.clone(),
689 };
690 Ok(SpawnServerResult {
691 endpoints: result.endpoints,
692 thread: handle,
693 key_updater: Arc::new(updater),
694 })
695}
696
697#[cfg(test)]
698mod test {
699 use {
700 super::*,
701 crate::nonblocking::quic::{
702 test::*, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
703 DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
704 },
705 crossbeam_channel::unbounded,
706 solana_sdk::net::DEFAULT_TPU_COALESCE,
707 std::net::SocketAddr,
708 };
709
710 fn setup_quic_server() -> (
711 std::thread::JoinHandle<()>,
712 Arc<AtomicBool>,
713 crossbeam_channel::Receiver<PacketBatch>,
714 SocketAddr,
715 ) {
716 let s = UdpSocket::bind("127.0.0.1:0").unwrap();
717 let exit = Arc::new(AtomicBool::new(false));
718 let (sender, receiver) = unbounded();
719 let keypair = Keypair::new();
720 let server_address = s.local_addr().unwrap();
721 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
722 let SpawnServerResult {
723 endpoints: _,
724 thread: t,
725 key_updater: _,
726 } = spawn_server(
727 "solQuicTest",
728 "quic_streamer_test",
729 s,
730 &keypair,
731 sender,
732 exit.clone(),
733 1,
734 staked_nodes,
735 MAX_STAKED_CONNECTIONS,
736 MAX_UNSTAKED_CONNECTIONS,
737 DEFAULT_MAX_STREAMS_PER_MS,
738 DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
739 DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
740 DEFAULT_TPU_COALESCE,
741 )
742 .unwrap();
743 (t, exit, receiver, server_address)
744 }
745
746 #[test]
747 fn test_quic_server_exit() {
748 let (t, exit, _receiver, _server_address) = setup_quic_server();
749 exit.store(true, Ordering::Relaxed);
750 t.join().unwrap();
751 }
752
753 #[test]
754 fn test_quic_timeout() {
755 solana_logger::setup();
756 let (t, exit, receiver, server_address) = setup_quic_server();
757 let runtime = rt("solQuicTestRt".to_string());
758 runtime.block_on(check_timeout(receiver, server_address));
759 exit.store(true, Ordering::Relaxed);
760 t.join().unwrap();
761 }
762
763 #[test]
764 fn test_quic_server_block_multiple_connections() {
765 solana_logger::setup();
766 let (t, exit, _receiver, server_address) = setup_quic_server();
767
768 let runtime = rt("solQuicTestRt".to_string());
769 runtime.block_on(check_block_multiple_connections(server_address));
770 exit.store(true, Ordering::Relaxed);
771 t.join().unwrap();
772 }
773
774 #[test]
775 fn test_quic_server_multiple_streams() {
776 solana_logger::setup();
777 let s = UdpSocket::bind("127.0.0.1:0").unwrap();
778 let exit = Arc::new(AtomicBool::new(false));
779 let (sender, receiver) = unbounded();
780 let keypair = Keypair::new();
781 let server_address = s.local_addr().unwrap();
782 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
783 let SpawnServerResult {
784 endpoints: _,
785 thread: t,
786 key_updater: _,
787 } = spawn_server(
788 "solQuicTest",
789 "quic_streamer_test",
790 s,
791 &keypair,
792 sender,
793 exit.clone(),
794 2,
795 staked_nodes,
796 MAX_STAKED_CONNECTIONS,
797 MAX_UNSTAKED_CONNECTIONS,
798 DEFAULT_MAX_STREAMS_PER_MS,
799 DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
800 DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
801 DEFAULT_TPU_COALESCE,
802 )
803 .unwrap();
804
805 let runtime = rt("solQuicTestRt".to_string());
806 runtime.block_on(check_multiple_streams(receiver, server_address));
807 exit.store(true, Ordering::Relaxed);
808 t.join().unwrap();
809 }
810
811 #[test]
812 fn test_quic_server_multiple_writes() {
813 solana_logger::setup();
814 let (t, exit, receiver, server_address) = setup_quic_server();
815
816 let runtime = rt("solQuicTestRt".to_string());
817 runtime.block_on(check_multiple_writes(receiver, server_address, None));
818 exit.store(true, Ordering::Relaxed);
819 t.join().unwrap();
820 }
821
822 #[test]
823 fn test_quic_server_unstaked_node_connect_failure() {
824 solana_logger::setup();
825 let s = UdpSocket::bind("127.0.0.1:0").unwrap();
826 let exit = Arc::new(AtomicBool::new(false));
827 let (sender, _) = unbounded();
828 let keypair = Keypair::new();
829 let server_address = s.local_addr().unwrap();
830 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
831 let SpawnServerResult {
832 endpoints: _,
833 thread: t,
834 key_updater: _,
835 } = spawn_server(
836 "solQuicTest",
837 "quic_streamer_test",
838 s,
839 &keypair,
840 sender,
841 exit.clone(),
842 1,
843 staked_nodes,
844 MAX_STAKED_CONNECTIONS,
845 0, DEFAULT_MAX_STREAMS_PER_MS,
847 DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
848 DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
849 DEFAULT_TPU_COALESCE,
850 )
851 .unwrap();
852
853 let runtime = rt("solQuicTestRt".to_string());
854 runtime.block_on(check_unstaked_node_connect_failure(server_address));
855 exit.store(true, Ordering::Relaxed);
856 t.join().unwrap();
857 }
858}