solana_streamer/
quic.rs

1use {
2    crate::{
3        nonblocking::quic::ALPN_TPU_PROTOCOL_ID, streamer::StakedNodes,
4        tls_certificates::new_dummy_x509_certificate,
5    },
6    crossbeam_channel::Sender,
7    pem::Pem,
8    quinn::{
9        crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
10        Endpoint, IdleTimeout, ServerConfig,
11    },
12    rustls::{
13        pki_types::{CertificateDer, UnixTime},
14        server::danger::ClientCertVerified,
15        DistinguishedName, KeyLogFile,
16    },
17    solana_perf::packet::PacketBatch,
18    solana_sdk::{
19        packet::PACKET_DATA_SIZE,
20        quic::{NotifyKeyUpdate, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
21        signature::Keypair,
22    },
23    std::{
24        net::UdpSocket,
25        sync::{
26            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
27            Arc, Mutex, RwLock,
28        },
29        thread,
30        time::Duration,
31    },
32    tokio::runtime::Runtime,
33};
34
35pub const MAX_STAKED_CONNECTIONS: usize = 2000;
36pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
37
38// This will be adjusted and parameterized in follow-on PRs.
39pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
40
41#[derive(Debug)]
42pub struct SkipClientVerification(Arc<rustls::crypto::CryptoProvider>);
43
44impl SkipClientVerification {
45    pub fn new() -> Arc<Self> {
46        Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider())))
47    }
48}
49
50pub struct SpawnServerResult {
51    pub endpoints: Vec<Endpoint>,
52    pub thread: thread::JoinHandle<()>,
53    pub key_updater: Arc<EndpointKeyUpdater>,
54}
55
56impl rustls::server::danger::ClientCertVerifier for SkipClientVerification {
57    fn verify_client_cert(
58        &self,
59        _end_entity: &CertificateDer,
60        _intermediates: &[CertificateDer],
61        _now: UnixTime,
62    ) -> Result<ClientCertVerified, rustls::Error> {
63        Ok(rustls::server::danger::ClientCertVerified::assertion())
64    }
65
66    fn root_hint_subjects(&self) -> &[DistinguishedName] {
67        &[]
68    }
69
70    fn verify_tls12_signature(
71        &self,
72        message: &[u8],
73        cert: &rustls::pki_types::CertificateDer<'_>,
74        dss: &rustls::DigitallySignedStruct,
75    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
76        rustls::crypto::verify_tls12_signature(
77            message,
78            cert,
79            dss,
80            &self.0.signature_verification_algorithms,
81        )
82    }
83
84    fn verify_tls13_signature(
85        &self,
86        message: &[u8],
87        cert: &rustls::pki_types::CertificateDer<'_>,
88        dss: &rustls::DigitallySignedStruct,
89    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
90        rustls::crypto::verify_tls13_signature(
91            message,
92            cert,
93            dss,
94            &self.0.signature_verification_algorithms,
95        )
96    }
97
98    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
99        self.0.signature_verification_algorithms.supported_schemes()
100    }
101
102    fn offer_client_auth(&self) -> bool {
103        true
104    }
105
106    fn client_auth_mandatory(&self) -> bool {
107        self.offer_client_auth()
108    }
109}
110
111/// Returns default server configuration along with its PEM certificate chain.
112#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
113pub(crate) fn configure_server(
114    identity_keypair: &Keypair,
115) -> Result<(ServerConfig, String), QuicServerError> {
116    let (cert, priv_key) = new_dummy_x509_certificate(identity_keypair);
117    let cert_chain_pem_parts = vec![Pem {
118        tag: "CERTIFICATE".to_string(),
119        contents: cert.as_ref().to_vec(),
120    }];
121    let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
122
123    let mut server_tls_config = rustls::ServerConfig::builder()
124        .with_client_cert_verifier(SkipClientVerification::new())
125        .with_single_cert(vec![cert], priv_key)?;
126    server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
127    server_tls_config.key_log = Arc::new(KeyLogFile::new());
128    let quic_server_config = QuicServerConfig::try_from(server_tls_config)?;
129
130    let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
131    let config = Arc::get_mut(&mut server_config.transport).unwrap();
132
133    // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
134    const MAX_CONCURRENT_UNI_STREAMS: u32 =
135        (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS.saturating_mul(2)) as u32;
136    config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
137    config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
138    config.receive_window((PACKET_DATA_SIZE as u32).into());
139    let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
140    config.max_idle_timeout(Some(timeout));
141
142    // disable bidi & datagrams
143    const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
144    config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
145    config.datagram_receive_buffer_size(None);
146
147    // Disable GSO. The server only accepts inbound unidirectional streams initiated by clients,
148    // which means that reply data never exceeds one MTU. By disabling GSO, we make
149    // quinn_proto::Connection::poll_transmit allocate only 1 MTU vs 10 * MTU for _each_ transmit.
150    // See https://github.com/anza-xyz/agave/pull/1647.
151    config.enable_segmentation_offload(false);
152
153    Ok((server_config, cert_chain_pem))
154}
155
156pub fn rt(name: String) -> Runtime {
157    tokio::runtime::Builder::new_multi_thread()
158        .thread_name(name)
159        .enable_all()
160        .build()
161        .unwrap()
162}
163
164#[derive(thiserror::Error, Debug)]
165pub enum QuicServerError {
166    #[error("Endpoint creation failed: {0}")]
167    EndpointFailed(std::io::Error),
168    #[error("TLS error: {0}")]
169    TlsError(#[from] rustls::Error),
170    #[error("No initial cipher suite")]
171    NoInitialCipherSuite(#[from] NoInitialCipherSuite),
172}
173
174pub struct EndpointKeyUpdater {
175    endpoints: Vec<Endpoint>,
176}
177
178impl NotifyKeyUpdate for EndpointKeyUpdater {
179    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
180        let (config, _) = configure_server(key)?;
181        for endpoint in &self.endpoints {
182            endpoint.set_server_config(Some(config.clone()));
183        }
184        Ok(())
185    }
186}
187
188#[derive(Default)]
189pub struct StreamerStats {
190    pub(crate) total_connections: AtomicUsize,
191    pub(crate) total_new_connections: AtomicUsize,
192    pub(crate) total_streams: AtomicUsize,
193    pub(crate) total_new_streams: AtomicUsize,
194    pub(crate) invalid_stream_size: AtomicUsize,
195    pub(crate) total_packets_allocated: AtomicUsize,
196    pub(crate) total_packet_batches_allocated: AtomicUsize,
197    pub(crate) total_chunks_received: AtomicUsize,
198    pub(crate) total_staked_chunks_received: AtomicUsize,
199    pub(crate) total_unstaked_chunks_received: AtomicUsize,
200    pub(crate) total_packet_batch_send_err: AtomicUsize,
201    pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
202    pub(crate) total_packet_batches_sent: AtomicUsize,
203    pub(crate) total_packet_batches_none: AtomicUsize,
204    pub(crate) total_packets_sent_for_batching: AtomicUsize,
205    pub(crate) total_bytes_sent_for_batching: AtomicUsize,
206    pub(crate) total_chunks_sent_for_batching: AtomicUsize,
207    pub(crate) total_packets_sent_to_consumer: AtomicUsize,
208    pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
209    pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
210    pub(crate) total_stream_read_errors: AtomicUsize,
211    pub(crate) total_stream_read_timeouts: AtomicUsize,
212    pub(crate) num_evictions: AtomicUsize,
213    pub(crate) connection_added_from_staked_peer: AtomicUsize,
214    pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
215    pub(crate) connection_add_failed: AtomicUsize,
216    pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
217    pub(crate) connection_add_failed_staked_node: AtomicUsize,
218    pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
219    pub(crate) connection_add_failed_on_pruning: AtomicUsize,
220    pub(crate) connection_setup_timeout: AtomicUsize,
221    pub(crate) connection_setup_error: AtomicUsize,
222    pub(crate) connection_setup_error_closed: AtomicUsize,
223    pub(crate) connection_setup_error_timed_out: AtomicUsize,
224    pub(crate) connection_setup_error_transport: AtomicUsize,
225    pub(crate) connection_setup_error_app_closed: AtomicUsize,
226    pub(crate) connection_setup_error_reset: AtomicUsize,
227    pub(crate) connection_setup_error_locally_closed: AtomicUsize,
228    pub(crate) connection_removed: AtomicUsize,
229    pub(crate) connection_remove_failed: AtomicUsize,
230    // Number of connections to the endpoint exceeding the allowed limit
231    // regardless of the source IP address.
232    pub(crate) connection_rate_limited_across_all: AtomicUsize,
233    // Per IP rate-limiting is triggered each time when there are too many connections
234    // opened from a particular IP address.
235    pub(crate) connection_rate_limited_per_ipaddr: AtomicUsize,
236    pub(crate) throttled_streams: AtomicUsize,
237    pub(crate) stream_load_ema: AtomicUsize,
238    pub(crate) stream_load_ema_overflow: AtomicUsize,
239    pub(crate) stream_load_capacity_overflow: AtomicUsize,
240    pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
241    pub(crate) perf_track_overhead_us: AtomicU64,
242    pub(crate) total_staked_packets_sent_for_batching: AtomicUsize,
243    pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize,
244    pub(crate) throttled_staked_streams: AtomicUsize,
245    pub(crate) throttled_unstaked_streams: AtomicUsize,
246    pub(crate) connection_rate_limiter_length: AtomicUsize,
247    // All connections in various states such as Incoming, Connecting, Connection
248    pub(crate) open_connections: AtomicUsize,
249    pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
250    pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
251    pub(crate) total_incoming_connection_attempts: AtomicUsize,
252    pub(crate) quic_endpoints_count: AtomicUsize,
253}
254
255impl StreamerStats {
256    pub fn report(&self, name: &'static str) {
257        let process_sampled_packets_us_hist = {
258            let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
259            let process_sampled_packets_us_hist = metrics.clone();
260            metrics.clear();
261            process_sampled_packets_us_hist
262        };
263
264        datapoint_info!(
265            name,
266            (
267                "active_connections",
268                self.total_connections.load(Ordering::Relaxed),
269                i64
270            ),
271            (
272                "active_streams",
273                self.total_streams.load(Ordering::Relaxed),
274                i64
275            ),
276            (
277                "new_connections",
278                self.total_new_connections.swap(0, Ordering::Relaxed),
279                i64
280            ),
281            (
282                "new_streams",
283                self.total_new_streams.swap(0, Ordering::Relaxed),
284                i64
285            ),
286            (
287                "evictions",
288                self.num_evictions.swap(0, Ordering::Relaxed),
289                i64
290            ),
291            (
292                "connection_added_from_staked_peer",
293                self.connection_added_from_staked_peer
294                    .swap(0, Ordering::Relaxed),
295                i64
296            ),
297            (
298                "connection_added_from_unstaked_peer",
299                self.connection_added_from_unstaked_peer
300                    .swap(0, Ordering::Relaxed),
301                i64
302            ),
303            (
304                "connection_add_failed",
305                self.connection_add_failed.swap(0, Ordering::Relaxed),
306                i64
307            ),
308            (
309                "connection_add_failed_invalid_stream_count",
310                self.connection_add_failed_invalid_stream_count
311                    .swap(0, Ordering::Relaxed),
312                i64
313            ),
314            (
315                "connection_add_failed_staked_node",
316                self.connection_add_failed_staked_node
317                    .swap(0, Ordering::Relaxed),
318                i64
319            ),
320            (
321                "connection_add_failed_unstaked_node",
322                self.connection_add_failed_unstaked_node
323                    .swap(0, Ordering::Relaxed),
324                i64
325            ),
326            (
327                "connection_add_failed_on_pruning",
328                self.connection_add_failed_on_pruning
329                    .swap(0, Ordering::Relaxed),
330                i64
331            ),
332            (
333                "connection_removed",
334                self.connection_removed.swap(0, Ordering::Relaxed),
335                i64
336            ),
337            (
338                "connection_remove_failed",
339                self.connection_remove_failed.swap(0, Ordering::Relaxed),
340                i64
341            ),
342            (
343                "connection_setup_timeout",
344                self.connection_setup_timeout.swap(0, Ordering::Relaxed),
345                i64
346            ),
347            (
348                "connection_setup_error",
349                self.connection_setup_error.swap(0, Ordering::Relaxed),
350                i64
351            ),
352            (
353                "connection_setup_error_timed_out",
354                self.connection_setup_error_timed_out
355                    .swap(0, Ordering::Relaxed),
356                i64
357            ),
358            (
359                "connection_setup_error_closed",
360                self.connection_setup_error_closed
361                    .swap(0, Ordering::Relaxed),
362                i64
363            ),
364            (
365                "connection_setup_error_transport",
366                self.connection_setup_error_transport
367                    .swap(0, Ordering::Relaxed),
368                i64
369            ),
370            (
371                "connection_setup_error_app_closed",
372                self.connection_setup_error_app_closed
373                    .swap(0, Ordering::Relaxed),
374                i64
375            ),
376            (
377                "connection_setup_error_reset",
378                self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
379                i64
380            ),
381            (
382                "connection_setup_error_locally_closed",
383                self.connection_setup_error_locally_closed
384                    .swap(0, Ordering::Relaxed),
385                i64
386            ),
387            (
388                "connection_rate_limited_across_all",
389                self.connection_rate_limited_across_all
390                    .swap(0, Ordering::Relaxed),
391                i64
392            ),
393            (
394                "connection_rate_limited_per_ipaddr",
395                self.connection_rate_limited_per_ipaddr
396                    .swap(0, Ordering::Relaxed),
397                i64
398            ),
399            (
400                "invalid_stream_size",
401                self.invalid_stream_size.swap(0, Ordering::Relaxed),
402                i64
403            ),
404            (
405                "packets_allocated",
406                self.total_packets_allocated.swap(0, Ordering::Relaxed),
407                i64
408            ),
409            (
410                "packet_batches_allocated",
411                self.total_packet_batches_allocated
412                    .swap(0, Ordering::Relaxed),
413                i64
414            ),
415            (
416                "packets_sent_for_batching",
417                self.total_packets_sent_for_batching
418                    .swap(0, Ordering::Relaxed),
419                i64
420            ),
421            (
422                "staked_packets_sent_for_batching",
423                self.total_staked_packets_sent_for_batching
424                    .swap(0, Ordering::Relaxed),
425                i64
426            ),
427            (
428                "unstaked_packets_sent_for_batching",
429                self.total_unstaked_packets_sent_for_batching
430                    .swap(0, Ordering::Relaxed),
431                i64
432            ),
433            (
434                "bytes_sent_for_batching",
435                self.total_bytes_sent_for_batching
436                    .swap(0, Ordering::Relaxed),
437                i64
438            ),
439            (
440                "chunks_sent_for_batching",
441                self.total_chunks_sent_for_batching
442                    .swap(0, Ordering::Relaxed),
443                i64
444            ),
445            (
446                "packets_sent_to_consumer",
447                self.total_packets_sent_to_consumer
448                    .swap(0, Ordering::Relaxed),
449                i64
450            ),
451            (
452                "bytes_sent_to_consumer",
453                self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
454                i64
455            ),
456            (
457                "chunks_processed_by_batcher",
458                self.total_chunks_processed_by_batcher
459                    .swap(0, Ordering::Relaxed),
460                i64
461            ),
462            (
463                "chunks_received",
464                self.total_chunks_received.swap(0, Ordering::Relaxed),
465                i64
466            ),
467            (
468                "staked_chunks_received",
469                self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
470                i64
471            ),
472            (
473                "unstaked_chunks_received",
474                self.total_unstaked_chunks_received
475                    .swap(0, Ordering::Relaxed),
476                i64
477            ),
478            (
479                "packet_batch_send_error",
480                self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
481                i64
482            ),
483            (
484                "handle_chunk_to_packet_batcher_send_error",
485                self.total_handle_chunk_to_packet_batcher_send_err
486                    .swap(0, Ordering::Relaxed),
487                i64
488            ),
489            (
490                "packet_batches_sent",
491                self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
492                i64
493            ),
494            (
495                "packet_batch_empty",
496                self.total_packet_batches_none.swap(0, Ordering::Relaxed),
497                i64
498            ),
499            (
500                "stream_read_errors",
501                self.total_stream_read_errors.swap(0, Ordering::Relaxed),
502                i64
503            ),
504            (
505                "stream_read_timeouts",
506                self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
507                i64
508            ),
509            (
510                "throttled_streams",
511                self.throttled_streams.swap(0, Ordering::Relaxed),
512                i64
513            ),
514            (
515                "stream_load_ema",
516                self.stream_load_ema.load(Ordering::Relaxed),
517                i64
518            ),
519            (
520                "stream_load_ema_overflow",
521                self.stream_load_ema_overflow.load(Ordering::Relaxed),
522                i64
523            ),
524            (
525                "stream_load_capacity_overflow",
526                self.stream_load_capacity_overflow.load(Ordering::Relaxed),
527                i64
528            ),
529            (
530                "throttled_unstaked_streams",
531                self.throttled_unstaked_streams.swap(0, Ordering::Relaxed),
532                i64
533            ),
534            (
535                "throttled_staked_streams",
536                self.throttled_staked_streams.swap(0, Ordering::Relaxed),
537                i64
538            ),
539            (
540                "process_sampled_packets_us_90pct",
541                process_sampled_packets_us_hist
542                    .percentile(90.0)
543                    .unwrap_or(0),
544                i64
545            ),
546            (
547                "process_sampled_packets_us_min",
548                process_sampled_packets_us_hist.minimum().unwrap_or(0),
549                i64
550            ),
551            (
552                "process_sampled_packets_us_max",
553                process_sampled_packets_us_hist.maximum().unwrap_or(0),
554                i64
555            ),
556            (
557                "process_sampled_packets_us_mean",
558                process_sampled_packets_us_hist.mean().unwrap_or(0),
559                i64
560            ),
561            (
562                "process_sampled_packets_count",
563                process_sampled_packets_us_hist.entries(),
564                i64
565            ),
566            (
567                "perf_track_overhead_us",
568                self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
569                i64
570            ),
571            (
572                "connection_rate_limiter_length",
573                self.connection_rate_limiter_length.load(Ordering::Relaxed),
574                i64
575            ),
576            (
577                "outstanding_incoming_connection_attempts",
578                self.outstanding_incoming_connection_attempts
579                    .load(Ordering::Relaxed),
580                i64
581            ),
582            (
583                "total_incoming_connection_attempts",
584                self.total_incoming_connection_attempts
585                    .load(Ordering::Relaxed),
586                i64
587            ),
588            (
589                "quic_endpoints_count",
590                self.quic_endpoints_count.load(Ordering::Relaxed),
591                i64
592            ),
593            (
594                "open_connections",
595                self.open_connections.load(Ordering::Relaxed),
596                i64
597            ),
598            (
599                "refused_connections_too_many_open_connections",
600                self.refused_connections_too_many_open_connections
601                    .swap(0, Ordering::Relaxed),
602                i64
603            ),
604        );
605    }
606}
607
608#[allow(clippy::too_many_arguments)]
609pub fn spawn_server(
610    thread_name: &'static str,
611    metrics_name: &'static str,
612    socket: UdpSocket,
613    keypair: &Keypair,
614    packet_sender: Sender<PacketBatch>,
615    exit: Arc<AtomicBool>,
616    max_connections_per_peer: usize,
617    staked_nodes: Arc<RwLock<StakedNodes>>,
618    max_staked_connections: usize,
619    max_unstaked_connections: usize,
620    max_streams_per_ms: u64,
621    max_connections_per_ipaddr_per_min: u64,
622    wait_for_chunk_timeout: Duration,
623    coalesce: Duration,
624) -> Result<SpawnServerResult, QuicServerError> {
625    spawn_server_multi(
626        thread_name,
627        metrics_name,
628        vec![socket],
629        keypair,
630        packet_sender,
631        exit,
632        max_connections_per_peer,
633        staked_nodes,
634        max_staked_connections,
635        max_unstaked_connections,
636        max_streams_per_ms,
637        max_connections_per_ipaddr_per_min,
638        wait_for_chunk_timeout,
639        coalesce,
640    )
641}
642
643#[allow(clippy::too_many_arguments)]
644pub fn spawn_server_multi(
645    thread_name: &'static str,
646    metrics_name: &'static str,
647    sockets: Vec<UdpSocket>,
648    keypair: &Keypair,
649    packet_sender: Sender<PacketBatch>,
650    exit: Arc<AtomicBool>,
651    max_connections_per_peer: usize,
652    staked_nodes: Arc<RwLock<StakedNodes>>,
653    max_staked_connections: usize,
654    max_unstaked_connections: usize,
655    max_streams_per_ms: u64,
656    max_connections_per_ipaddr_per_min: u64,
657    wait_for_chunk_timeout: Duration,
658    coalesce: Duration,
659) -> Result<SpawnServerResult, QuicServerError> {
660    let runtime = rt(format!("{thread_name}Rt"));
661    let result = {
662        let _guard = runtime.enter();
663        crate::nonblocking::quic::spawn_server_multi(
664            metrics_name,
665            sockets,
666            keypair,
667            packet_sender,
668            exit,
669            max_connections_per_peer,
670            staked_nodes,
671            max_staked_connections,
672            max_unstaked_connections,
673            max_streams_per_ms,
674            max_connections_per_ipaddr_per_min,
675            wait_for_chunk_timeout,
676            coalesce,
677        )
678    }?;
679    let handle = thread::Builder::new()
680        .name(thread_name.into())
681        .spawn(move || {
682            if let Err(e) = runtime.block_on(result.thread) {
683                warn!("error from runtime.block_on: {:?}", e);
684            }
685        })
686        .unwrap();
687    let updater = EndpointKeyUpdater {
688        endpoints: result.endpoints.clone(),
689    };
690    Ok(SpawnServerResult {
691        endpoints: result.endpoints,
692        thread: handle,
693        key_updater: Arc::new(updater),
694    })
695}
696
697#[cfg(test)]
698mod test {
699    use {
700        super::*,
701        crate::nonblocking::quic::{
702            test::*, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
703            DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
704        },
705        crossbeam_channel::unbounded,
706        solana_sdk::net::DEFAULT_TPU_COALESCE,
707        std::net::SocketAddr,
708    };
709
710    fn setup_quic_server() -> (
711        std::thread::JoinHandle<()>,
712        Arc<AtomicBool>,
713        crossbeam_channel::Receiver<PacketBatch>,
714        SocketAddr,
715    ) {
716        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
717        let exit = Arc::new(AtomicBool::new(false));
718        let (sender, receiver) = unbounded();
719        let keypair = Keypair::new();
720        let server_address = s.local_addr().unwrap();
721        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
722        let SpawnServerResult {
723            endpoints: _,
724            thread: t,
725            key_updater: _,
726        } = spawn_server(
727            "solQuicTest",
728            "quic_streamer_test",
729            s,
730            &keypair,
731            sender,
732            exit.clone(),
733            1,
734            staked_nodes,
735            MAX_STAKED_CONNECTIONS,
736            MAX_UNSTAKED_CONNECTIONS,
737            DEFAULT_MAX_STREAMS_PER_MS,
738            DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
739            DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
740            DEFAULT_TPU_COALESCE,
741        )
742        .unwrap();
743        (t, exit, receiver, server_address)
744    }
745
746    #[test]
747    fn test_quic_server_exit() {
748        let (t, exit, _receiver, _server_address) = setup_quic_server();
749        exit.store(true, Ordering::Relaxed);
750        t.join().unwrap();
751    }
752
753    #[test]
754    fn test_quic_timeout() {
755        solana_logger::setup();
756        let (t, exit, receiver, server_address) = setup_quic_server();
757        let runtime = rt("solQuicTestRt".to_string());
758        runtime.block_on(check_timeout(receiver, server_address));
759        exit.store(true, Ordering::Relaxed);
760        t.join().unwrap();
761    }
762
763    #[test]
764    fn test_quic_server_block_multiple_connections() {
765        solana_logger::setup();
766        let (t, exit, _receiver, server_address) = setup_quic_server();
767
768        let runtime = rt("solQuicTestRt".to_string());
769        runtime.block_on(check_block_multiple_connections(server_address));
770        exit.store(true, Ordering::Relaxed);
771        t.join().unwrap();
772    }
773
774    #[test]
775    fn test_quic_server_multiple_streams() {
776        solana_logger::setup();
777        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
778        let exit = Arc::new(AtomicBool::new(false));
779        let (sender, receiver) = unbounded();
780        let keypair = Keypair::new();
781        let server_address = s.local_addr().unwrap();
782        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
783        let SpawnServerResult {
784            endpoints: _,
785            thread: t,
786            key_updater: _,
787        } = spawn_server(
788            "solQuicTest",
789            "quic_streamer_test",
790            s,
791            &keypair,
792            sender,
793            exit.clone(),
794            2,
795            staked_nodes,
796            MAX_STAKED_CONNECTIONS,
797            MAX_UNSTAKED_CONNECTIONS,
798            DEFAULT_MAX_STREAMS_PER_MS,
799            DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
800            DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
801            DEFAULT_TPU_COALESCE,
802        )
803        .unwrap();
804
805        let runtime = rt("solQuicTestRt".to_string());
806        runtime.block_on(check_multiple_streams(receiver, server_address));
807        exit.store(true, Ordering::Relaxed);
808        t.join().unwrap();
809    }
810
811    #[test]
812    fn test_quic_server_multiple_writes() {
813        solana_logger::setup();
814        let (t, exit, receiver, server_address) = setup_quic_server();
815
816        let runtime = rt("solQuicTestRt".to_string());
817        runtime.block_on(check_multiple_writes(receiver, server_address, None));
818        exit.store(true, Ordering::Relaxed);
819        t.join().unwrap();
820    }
821
822    #[test]
823    fn test_quic_server_unstaked_node_connect_failure() {
824        solana_logger::setup();
825        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
826        let exit = Arc::new(AtomicBool::new(false));
827        let (sender, _) = unbounded();
828        let keypair = Keypair::new();
829        let server_address = s.local_addr().unwrap();
830        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
831        let SpawnServerResult {
832            endpoints: _,
833            thread: t,
834            key_updater: _,
835        } = spawn_server(
836            "solQuicTest",
837            "quic_streamer_test",
838            s,
839            &keypair,
840            sender,
841            exit.clone(),
842            1,
843            staked_nodes,
844            MAX_STAKED_CONNECTIONS,
845            0, // Do not allow any connection from unstaked clients/nodes
846            DEFAULT_MAX_STREAMS_PER_MS,
847            DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
848            DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
849            DEFAULT_TPU_COALESCE,
850        )
851        .unwrap();
852
853        let runtime = rt("solQuicTestRt".to_string());
854        runtime.block_on(check_unstaked_node_connect_failure(server_address));
855        exit.store(true, Ordering::Relaxed);
856        t.join().unwrap();
857    }
858}