solana_streamer/
quic.rs

1use {
2    crate::{
3        nonblocking::quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
4        streamer::StakedNodes,
5    },
6    crossbeam_channel::Sender,
7    pem::Pem,
8    quinn::{
9        crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
10        Endpoint, IdleTimeout, ServerConfig,
11    },
12    rustls::KeyLogFile,
13    solana_keypair::Keypair,
14    solana_packet::PACKET_DATA_SIZE,
15    solana_perf::packet::PacketBatch,
16    solana_quic_definitions::{
17        NotifyKeyUpdate, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
18    },
19    solana_tls_utils::{new_dummy_x509_certificate, tls_server_config_builder},
20    std::{
21        net::UdpSocket,
22        sync::{
23            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
24            Arc, Mutex, RwLock,
25        },
26        thread,
27        time::Duration,
28    },
29    tokio::runtime::Runtime,
30};
31
32// allow multiple connections for NAT and any open/close overlap
33pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
34
35#[deprecated(
36    since = "2.2.0",
37    note = "Use solana_streamer::quic::DEFAULT_MAX_STAKED_CONNECTIONS"
38)]
39pub const MAX_STAKED_CONNECTIONS: usize = 2000;
40
41pub const DEFAULT_MAX_STAKED_CONNECTIONS: usize = 2000;
42
43#[deprecated(
44    since = "2.2.0",
45    note = "Use solana_streamer::quic::DEFAULT_MAX_UNSTAKED_CONNECTIONS"
46)]
47pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
48
49pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 500;
50
51/// Limit to 250K PPS
52pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;
53
54/// The new connections per minute from a particular IP address.
55/// Heuristically set to the default maximum concurrent connections
56/// per IP address. Might be adjusted later.
57pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
58
59// This will be adjusted and parameterized in follow-on PRs.
60pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
61// inlined to avoid solana-sdk dep
62pub(crate) const DEFAULT_TPU_COALESCE: Duration = Duration::from_millis(5);
63
64pub struct SpawnServerResult {
65    pub endpoints: Vec<Endpoint>,
66    pub thread: thread::JoinHandle<()>,
67    pub key_updater: Arc<EndpointKeyUpdater>,
68}
69
70/// Controls the the channel size for the PacketBatch coalesce
71pub(crate) const DEFAULT_MAX_COALESCE_CHANNEL_SIZE: usize = 1_000_000;
72
73/// Returns default server configuration along with its PEM certificate chain.
74#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
75pub(crate) fn configure_server(
76    identity_keypair: &Keypair,
77) -> Result<(ServerConfig, String), QuicServerError> {
78    let (cert, priv_key) = new_dummy_x509_certificate(identity_keypair);
79    let cert_chain_pem_parts = vec![Pem {
80        tag: "CERTIFICATE".to_string(),
81        contents: cert.as_ref().to_vec(),
82    }];
83    let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
84
85    let mut server_tls_config =
86        tls_server_config_builder().with_single_cert(vec![cert], priv_key)?;
87    server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
88    server_tls_config.key_log = Arc::new(KeyLogFile::new());
89    let quic_server_config = QuicServerConfig::try_from(server_tls_config)?;
90
91    let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
92    let config = Arc::get_mut(&mut server_config.transport).unwrap();
93
94    // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
95    const MAX_CONCURRENT_UNI_STREAMS: u32 =
96        (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS.saturating_mul(2)) as u32;
97    config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
98    config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
99    config.receive_window((PACKET_DATA_SIZE as u32).into());
100    let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
101    config.max_idle_timeout(Some(timeout));
102
103    // disable bidi & datagrams
104    const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
105    config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
106    config.datagram_receive_buffer_size(None);
107
108    // Disable GSO. The server only accepts inbound unidirectional streams initiated by clients,
109    // which means that reply data never exceeds one MTU. By disabling GSO, we make
110    // quinn_proto::Connection::poll_transmit allocate only 1 MTU vs 10 * MTU for _each_ transmit.
111    // See https://github.com/anza-xyz/agave/pull/1647.
112    config.enable_segmentation_offload(false);
113
114    Ok((server_config, cert_chain_pem))
115}
116
117pub fn rt(name: String) -> Runtime {
118    tokio::runtime::Builder::new_multi_thread()
119        .thread_name(name)
120        .enable_all()
121        .build()
122        .unwrap()
123}
124
125#[derive(thiserror::Error, Debug)]
126pub enum QuicServerError {
127    #[error("Endpoint creation failed: {0}")]
128    EndpointFailed(std::io::Error),
129    #[error("TLS error: {0}")]
130    TlsError(#[from] rustls::Error),
131    #[error("No initial cipher suite")]
132    NoInitialCipherSuite(#[from] NoInitialCipherSuite),
133}
134
135pub struct EndpointKeyUpdater {
136    endpoints: Vec<Endpoint>,
137}
138
139impl NotifyKeyUpdate for EndpointKeyUpdater {
140    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
141        let (config, _) = configure_server(key)?;
142        for endpoint in &self.endpoints {
143            endpoint.set_server_config(Some(config.clone()));
144        }
145        Ok(())
146    }
147}
148
149#[derive(Default)]
150pub struct StreamerStats {
151    pub(crate) total_connections: AtomicUsize,
152    pub(crate) total_new_connections: AtomicUsize,
153    pub(crate) total_streams: AtomicUsize,
154    pub(crate) total_new_streams: AtomicUsize,
155    pub(crate) invalid_stream_size: AtomicUsize,
156    pub(crate) total_packets_allocated: AtomicUsize,
157    pub(crate) total_packet_batches_allocated: AtomicUsize,
158    pub(crate) total_chunks_received: AtomicUsize,
159    pub(crate) total_staked_chunks_received: AtomicUsize,
160    pub(crate) total_unstaked_chunks_received: AtomicUsize,
161    pub(crate) total_packet_batch_send_err: AtomicUsize,
162    pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
163    pub(crate) total_packet_batches_sent: AtomicUsize,
164    pub(crate) total_packet_batches_none: AtomicUsize,
165    pub(crate) total_packets_sent_for_batching: AtomicUsize,
166    pub(crate) total_bytes_sent_for_batching: AtomicUsize,
167    pub(crate) total_chunks_sent_for_batching: AtomicUsize,
168    pub(crate) total_packets_sent_to_consumer: AtomicUsize,
169    pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
170    pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
171    pub(crate) total_stream_read_errors: AtomicUsize,
172    pub(crate) total_stream_read_timeouts: AtomicUsize,
173    pub(crate) num_evictions: AtomicUsize,
174    pub(crate) connection_added_from_staked_peer: AtomicUsize,
175    pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
176    pub(crate) connection_add_failed: AtomicUsize,
177    pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
178    pub(crate) connection_add_failed_staked_node: AtomicUsize,
179    pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
180    pub(crate) connection_add_failed_on_pruning: AtomicUsize,
181    pub(crate) connection_setup_timeout: AtomicUsize,
182    pub(crate) connection_setup_error: AtomicUsize,
183    pub(crate) connection_setup_error_closed: AtomicUsize,
184    pub(crate) connection_setup_error_timed_out: AtomicUsize,
185    pub(crate) connection_setup_error_transport: AtomicUsize,
186    pub(crate) connection_setup_error_app_closed: AtomicUsize,
187    pub(crate) connection_setup_error_reset: AtomicUsize,
188    pub(crate) connection_setup_error_locally_closed: AtomicUsize,
189    pub(crate) connection_removed: AtomicUsize,
190    pub(crate) connection_remove_failed: AtomicUsize,
191    // Number of connections to the endpoint exceeding the allowed limit
192    // regardless of the source IP address.
193    pub(crate) connection_rate_limited_across_all: AtomicUsize,
194    // Per IP rate-limiting is triggered each time when there are too many connections
195    // opened from a particular IP address.
196    pub(crate) connection_rate_limited_per_ipaddr: AtomicUsize,
197    pub(crate) throttled_streams: AtomicUsize,
198    pub(crate) stream_load_ema: AtomicUsize,
199    pub(crate) stream_load_ema_overflow: AtomicUsize,
200    pub(crate) stream_load_capacity_overflow: AtomicUsize,
201    pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
202    pub(crate) perf_track_overhead_us: AtomicU64,
203    pub(crate) total_staked_packets_sent_for_batching: AtomicUsize,
204    pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize,
205    pub(crate) throttled_staked_streams: AtomicUsize,
206    pub(crate) throttled_unstaked_streams: AtomicUsize,
207    pub(crate) connection_rate_limiter_length: AtomicUsize,
208    // All connections in various states such as Incoming, Connecting, Connection
209    pub(crate) open_connections: AtomicUsize,
210    pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
211    pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
212    pub(crate) total_incoming_connection_attempts: AtomicUsize,
213    pub(crate) quic_endpoints_count: AtomicUsize,
214}
215
216impl StreamerStats {
217    pub fn report(&self, name: &'static str) {
218        let process_sampled_packets_us_hist = {
219            let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
220            let process_sampled_packets_us_hist = metrics.clone();
221            metrics.clear();
222            process_sampled_packets_us_hist
223        };
224
225        datapoint_info!(
226            name,
227            (
228                "active_connections",
229                self.total_connections.load(Ordering::Relaxed),
230                i64
231            ),
232            (
233                "active_streams",
234                self.total_streams.load(Ordering::Relaxed),
235                i64
236            ),
237            (
238                "new_connections",
239                self.total_new_connections.swap(0, Ordering::Relaxed),
240                i64
241            ),
242            (
243                "new_streams",
244                self.total_new_streams.swap(0, Ordering::Relaxed),
245                i64
246            ),
247            (
248                "evictions",
249                self.num_evictions.swap(0, Ordering::Relaxed),
250                i64
251            ),
252            (
253                "connection_added_from_staked_peer",
254                self.connection_added_from_staked_peer
255                    .swap(0, Ordering::Relaxed),
256                i64
257            ),
258            (
259                "connection_added_from_unstaked_peer",
260                self.connection_added_from_unstaked_peer
261                    .swap(0, Ordering::Relaxed),
262                i64
263            ),
264            (
265                "connection_add_failed",
266                self.connection_add_failed.swap(0, Ordering::Relaxed),
267                i64
268            ),
269            (
270                "connection_add_failed_invalid_stream_count",
271                self.connection_add_failed_invalid_stream_count
272                    .swap(0, Ordering::Relaxed),
273                i64
274            ),
275            (
276                "connection_add_failed_staked_node",
277                self.connection_add_failed_staked_node
278                    .swap(0, Ordering::Relaxed),
279                i64
280            ),
281            (
282                "connection_add_failed_unstaked_node",
283                self.connection_add_failed_unstaked_node
284                    .swap(0, Ordering::Relaxed),
285                i64
286            ),
287            (
288                "connection_add_failed_on_pruning",
289                self.connection_add_failed_on_pruning
290                    .swap(0, Ordering::Relaxed),
291                i64
292            ),
293            (
294                "connection_removed",
295                self.connection_removed.swap(0, Ordering::Relaxed),
296                i64
297            ),
298            (
299                "connection_remove_failed",
300                self.connection_remove_failed.swap(0, Ordering::Relaxed),
301                i64
302            ),
303            (
304                "connection_setup_timeout",
305                self.connection_setup_timeout.swap(0, Ordering::Relaxed),
306                i64
307            ),
308            (
309                "connection_setup_error",
310                self.connection_setup_error.swap(0, Ordering::Relaxed),
311                i64
312            ),
313            (
314                "connection_setup_error_timed_out",
315                self.connection_setup_error_timed_out
316                    .swap(0, Ordering::Relaxed),
317                i64
318            ),
319            (
320                "connection_setup_error_closed",
321                self.connection_setup_error_closed
322                    .swap(0, Ordering::Relaxed),
323                i64
324            ),
325            (
326                "connection_setup_error_transport",
327                self.connection_setup_error_transport
328                    .swap(0, Ordering::Relaxed),
329                i64
330            ),
331            (
332                "connection_setup_error_app_closed",
333                self.connection_setup_error_app_closed
334                    .swap(0, Ordering::Relaxed),
335                i64
336            ),
337            (
338                "connection_setup_error_reset",
339                self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
340                i64
341            ),
342            (
343                "connection_setup_error_locally_closed",
344                self.connection_setup_error_locally_closed
345                    .swap(0, Ordering::Relaxed),
346                i64
347            ),
348            (
349                "connection_rate_limited_across_all",
350                self.connection_rate_limited_across_all
351                    .swap(0, Ordering::Relaxed),
352                i64
353            ),
354            (
355                "connection_rate_limited_per_ipaddr",
356                self.connection_rate_limited_per_ipaddr
357                    .swap(0, Ordering::Relaxed),
358                i64
359            ),
360            (
361                "invalid_stream_size",
362                self.invalid_stream_size.swap(0, Ordering::Relaxed),
363                i64
364            ),
365            (
366                "packets_allocated",
367                self.total_packets_allocated.swap(0, Ordering::Relaxed),
368                i64
369            ),
370            (
371                "packet_batches_allocated",
372                self.total_packet_batches_allocated
373                    .swap(0, Ordering::Relaxed),
374                i64
375            ),
376            (
377                "packets_sent_for_batching",
378                self.total_packets_sent_for_batching
379                    .swap(0, Ordering::Relaxed),
380                i64
381            ),
382            (
383                "staked_packets_sent_for_batching",
384                self.total_staked_packets_sent_for_batching
385                    .swap(0, Ordering::Relaxed),
386                i64
387            ),
388            (
389                "unstaked_packets_sent_for_batching",
390                self.total_unstaked_packets_sent_for_batching
391                    .swap(0, Ordering::Relaxed),
392                i64
393            ),
394            (
395                "bytes_sent_for_batching",
396                self.total_bytes_sent_for_batching
397                    .swap(0, Ordering::Relaxed),
398                i64
399            ),
400            (
401                "chunks_sent_for_batching",
402                self.total_chunks_sent_for_batching
403                    .swap(0, Ordering::Relaxed),
404                i64
405            ),
406            (
407                "packets_sent_to_consumer",
408                self.total_packets_sent_to_consumer
409                    .swap(0, Ordering::Relaxed),
410                i64
411            ),
412            (
413                "bytes_sent_to_consumer",
414                self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
415                i64
416            ),
417            (
418                "chunks_processed_by_batcher",
419                self.total_chunks_processed_by_batcher
420                    .swap(0, Ordering::Relaxed),
421                i64
422            ),
423            (
424                "chunks_received",
425                self.total_chunks_received.swap(0, Ordering::Relaxed),
426                i64
427            ),
428            (
429                "staked_chunks_received",
430                self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
431                i64
432            ),
433            (
434                "unstaked_chunks_received",
435                self.total_unstaked_chunks_received
436                    .swap(0, Ordering::Relaxed),
437                i64
438            ),
439            (
440                "packet_batch_send_error",
441                self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
442                i64
443            ),
444            (
445                "handle_chunk_to_packet_batcher_send_error",
446                self.total_handle_chunk_to_packet_batcher_send_err
447                    .swap(0, Ordering::Relaxed),
448                i64
449            ),
450            (
451                "packet_batches_sent",
452                self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
453                i64
454            ),
455            (
456                "packet_batch_empty",
457                self.total_packet_batches_none.swap(0, Ordering::Relaxed),
458                i64
459            ),
460            (
461                "stream_read_errors",
462                self.total_stream_read_errors.swap(0, Ordering::Relaxed),
463                i64
464            ),
465            (
466                "stream_read_timeouts",
467                self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
468                i64
469            ),
470            (
471                "throttled_streams",
472                self.throttled_streams.swap(0, Ordering::Relaxed),
473                i64
474            ),
475            (
476                "stream_load_ema",
477                self.stream_load_ema.load(Ordering::Relaxed),
478                i64
479            ),
480            (
481                "stream_load_ema_overflow",
482                self.stream_load_ema_overflow.load(Ordering::Relaxed),
483                i64
484            ),
485            (
486                "stream_load_capacity_overflow",
487                self.stream_load_capacity_overflow.load(Ordering::Relaxed),
488                i64
489            ),
490            (
491                "throttled_unstaked_streams",
492                self.throttled_unstaked_streams.swap(0, Ordering::Relaxed),
493                i64
494            ),
495            (
496                "throttled_staked_streams",
497                self.throttled_staked_streams.swap(0, Ordering::Relaxed),
498                i64
499            ),
500            (
501                "process_sampled_packets_us_90pct",
502                process_sampled_packets_us_hist
503                    .percentile(90.0)
504                    .unwrap_or(0),
505                i64
506            ),
507            (
508                "process_sampled_packets_us_min",
509                process_sampled_packets_us_hist.minimum().unwrap_or(0),
510                i64
511            ),
512            (
513                "process_sampled_packets_us_max",
514                process_sampled_packets_us_hist.maximum().unwrap_or(0),
515                i64
516            ),
517            (
518                "process_sampled_packets_us_mean",
519                process_sampled_packets_us_hist.mean().unwrap_or(0),
520                i64
521            ),
522            (
523                "process_sampled_packets_count",
524                process_sampled_packets_us_hist.entries(),
525                i64
526            ),
527            (
528                "perf_track_overhead_us",
529                self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
530                i64
531            ),
532            (
533                "connection_rate_limiter_length",
534                self.connection_rate_limiter_length.load(Ordering::Relaxed),
535                i64
536            ),
537            (
538                "outstanding_incoming_connection_attempts",
539                self.outstanding_incoming_connection_attempts
540                    .load(Ordering::Relaxed),
541                i64
542            ),
543            (
544                "total_incoming_connection_attempts",
545                self.total_incoming_connection_attempts
546                    .load(Ordering::Relaxed),
547                i64
548            ),
549            (
550                "quic_endpoints_count",
551                self.quic_endpoints_count.load(Ordering::Relaxed),
552                i64
553            ),
554            (
555                "open_connections",
556                self.open_connections.load(Ordering::Relaxed),
557                i64
558            ),
559            (
560                "refused_connections_too_many_open_connections",
561                self.refused_connections_too_many_open_connections
562                    .swap(0, Ordering::Relaxed),
563                i64
564            ),
565        );
566    }
567}
568
569pub fn spawn_server(
570    thread_name: &'static str,
571    metrics_name: &'static str,
572    socket: UdpSocket,
573    keypair: &Keypair,
574    packet_sender: Sender<PacketBatch>,
575    exit: Arc<AtomicBool>,
576    staked_nodes: Arc<RwLock<StakedNodes>>,
577    quic_server_params: QuicServerParams,
578) -> Result<SpawnServerResult, QuicServerError> {
579    spawn_server_multi(
580        thread_name,
581        metrics_name,
582        vec![socket],
583        keypair,
584        packet_sender,
585        exit,
586        staked_nodes,
587        quic_server_params,
588    )
589}
590
591#[derive(Clone)]
592pub struct QuicServerParams {
593    pub max_connections_per_peer: usize,
594    pub max_staked_connections: usize,
595    pub max_unstaked_connections: usize,
596    pub max_streams_per_ms: u64,
597    pub max_connections_per_ipaddr_per_min: u64,
598    pub wait_for_chunk_timeout: Duration,
599    pub coalesce: Duration,
600    pub coalesce_channel_size: usize,
601}
602
603impl Default for QuicServerParams {
604    fn default() -> Self {
605        QuicServerParams {
606            max_connections_per_peer: 1,
607            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
608            max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
609            max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
610            max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
611            wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
612            coalesce: DEFAULT_TPU_COALESCE,
613            coalesce_channel_size: DEFAULT_MAX_COALESCE_CHANNEL_SIZE,
614        }
615    }
616}
617
618pub fn spawn_server_multi(
619    thread_name: &'static str,
620    metrics_name: &'static str,
621    sockets: Vec<UdpSocket>,
622    keypair: &Keypair,
623    packet_sender: Sender<PacketBatch>,
624    exit: Arc<AtomicBool>,
625    staked_nodes: Arc<RwLock<StakedNodes>>,
626    quic_server_params: QuicServerParams,
627) -> Result<SpawnServerResult, QuicServerError> {
628    let runtime = rt(format!("{thread_name}Rt"));
629    let result = {
630        let _guard = runtime.enter();
631        crate::nonblocking::quic::spawn_server_multi(
632            metrics_name,
633            sockets,
634            keypair,
635            packet_sender,
636            exit,
637            staked_nodes,
638            quic_server_params,
639        )
640    }?;
641    let handle = thread::Builder::new()
642        .name(thread_name.into())
643        .spawn(move || {
644            if let Err(e) = runtime.block_on(result.thread) {
645                warn!("error from runtime.block_on: {:?}", e);
646            }
647        })
648        .unwrap();
649    let updater = EndpointKeyUpdater {
650        endpoints: result.endpoints.clone(),
651    };
652    Ok(SpawnServerResult {
653        endpoints: result.endpoints,
654        thread: handle,
655        key_updater: Arc::new(updater),
656    })
657}
658
659#[cfg(test)]
660mod test {
661    use {
662        super::*,
663        crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
664        crossbeam_channel::unbounded,
665        solana_net_utils::bind_to_localhost,
666        std::net::SocketAddr,
667    };
668
669    fn setup_quic_server() -> (
670        std::thread::JoinHandle<()>,
671        Arc<AtomicBool>,
672        crossbeam_channel::Receiver<PacketBatch>,
673        SocketAddr,
674    ) {
675        let s = bind_to_localhost().unwrap();
676        let exit = Arc::new(AtomicBool::new(false));
677        let (sender, receiver) = unbounded();
678        let keypair = Keypair::new();
679        let server_address = s.local_addr().unwrap();
680        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
681        let SpawnServerResult {
682            endpoints: _,
683            thread: t,
684            key_updater: _,
685        } = spawn_server(
686            "solQuicTest",
687            "quic_streamer_test",
688            s,
689            &keypair,
690            sender,
691            exit.clone(),
692            staked_nodes,
693            QuicServerParams {
694                coalesce_channel_size: 100_000, // smaller channel size for faster test
695                ..Default::default()
696            },
697        )
698        .unwrap();
699        (t, exit, receiver, server_address)
700    }
701
702    #[test]
703    fn test_quic_server_exit() {
704        let (t, exit, _receiver, _server_address) = setup_quic_server();
705        exit.store(true, Ordering::Relaxed);
706        t.join().unwrap();
707    }
708
709    #[test]
710    fn test_quic_timeout() {
711        solana_logger::setup();
712        let (t, exit, receiver, server_address) = setup_quic_server();
713        let runtime = rt("solQuicTestRt".to_string());
714        runtime.block_on(check_timeout(receiver, server_address));
715        exit.store(true, Ordering::Relaxed);
716        t.join().unwrap();
717    }
718
719    #[test]
720    fn test_quic_server_block_multiple_connections() {
721        solana_logger::setup();
722        let (t, exit, _receiver, server_address) = setup_quic_server();
723
724        let runtime = rt("solQuicTestRt".to_string());
725        runtime.block_on(check_block_multiple_connections(server_address));
726        exit.store(true, Ordering::Relaxed);
727        t.join().unwrap();
728    }
729
730    #[test]
731    fn test_quic_server_multiple_streams() {
732        solana_logger::setup();
733        let s = bind_to_localhost().unwrap();
734        let exit = Arc::new(AtomicBool::new(false));
735        let (sender, receiver) = unbounded();
736        let keypair = Keypair::new();
737        let server_address = s.local_addr().unwrap();
738        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
739        let SpawnServerResult {
740            endpoints: _,
741            thread: t,
742            key_updater: _,
743        } = spawn_server(
744            "solQuicTest",
745            "quic_streamer_test",
746            s,
747            &keypair,
748            sender,
749            exit.clone(),
750            staked_nodes,
751            QuicServerParams {
752                max_connections_per_peer: 2,
753                coalesce_channel_size: 100_000, // smaller channel size for faster test
754                ..QuicServerParams::default()
755            },
756        )
757        .unwrap();
758
759        let runtime = rt("solQuicTestRt".to_string());
760        runtime.block_on(check_multiple_streams(receiver, server_address, None));
761        exit.store(true, Ordering::Relaxed);
762        t.join().unwrap();
763    }
764
765    #[test]
766    fn test_quic_server_multiple_writes() {
767        solana_logger::setup();
768        let (t, exit, receiver, server_address) = setup_quic_server();
769
770        let runtime = rt("solQuicTestRt".to_string());
771        runtime.block_on(check_multiple_writes(receiver, server_address, None));
772        exit.store(true, Ordering::Relaxed);
773        t.join().unwrap();
774    }
775
776    #[test]
777    fn test_quic_server_unstaked_node_connect_failure() {
778        solana_logger::setup();
779        let s = bind_to_localhost().unwrap();
780        let exit = Arc::new(AtomicBool::new(false));
781        let (sender, _) = unbounded();
782        let keypair = Keypair::new();
783        let server_address = s.local_addr().unwrap();
784        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
785        let SpawnServerResult {
786            endpoints: _,
787            thread: t,
788            key_updater: _,
789        } = spawn_server(
790            "solQuicTest",
791            "quic_streamer_test",
792            s,
793            &keypair,
794            sender,
795            exit.clone(),
796            staked_nodes,
797            QuicServerParams {
798                max_unstaked_connections: 0,
799                coalesce_channel_size: 100_000, // smaller channel size for faster test
800                ..QuicServerParams::default()
801            },
802        )
803        .unwrap();
804
805        let runtime = rt("solQuicTestRt".to_string());
806        runtime.block_on(check_unstaked_node_connect_failure(server_address));
807        exit.store(true, Ordering::Relaxed);
808        t.join().unwrap();
809    }
810
811    #[test]
812    fn test_inline_tpu_coalesce() {
813        assert_eq!(DEFAULT_TPU_COALESCE, solana_sdk::net::DEFAULT_TPU_COALESCE);
814    }
815}