1use {
2 crate::{
3 nonblocking::quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
4 streamer::StakedNodes,
5 },
6 crossbeam_channel::Sender,
7 pem::Pem,
8 quinn::{
9 crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
10 Endpoint, IdleTimeout, ServerConfig,
11 },
12 rustls::KeyLogFile,
13 solana_keypair::Keypair,
14 solana_packet::PACKET_DATA_SIZE,
15 solana_perf::packet::PacketBatch,
16 solana_quic_definitions::{
17 NotifyKeyUpdate, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
18 },
19 solana_tls_utils::{new_dummy_x509_certificate, tls_server_config_builder},
20 std::{
21 net::UdpSocket,
22 sync::{
23 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
24 Arc, Mutex, RwLock,
25 },
26 thread,
27 time::Duration,
28 },
29 tokio::runtime::Runtime,
30};
31
32pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
34
35#[deprecated(
36 since = "2.2.0",
37 note = "Use solana_streamer::quic::DEFAULT_MAX_STAKED_CONNECTIONS"
38)]
39pub const MAX_STAKED_CONNECTIONS: usize = 2000;
40
41pub const DEFAULT_MAX_STAKED_CONNECTIONS: usize = 2000;
42
43#[deprecated(
44 since = "2.2.0",
45 note = "Use solana_streamer::quic::DEFAULT_MAX_UNSTAKED_CONNECTIONS"
46)]
47pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
48
49pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 500;
50
51pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;
53
54pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
58
59pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
61pub(crate) const DEFAULT_TPU_COALESCE: Duration = Duration::from_millis(5);
63
64pub struct SpawnServerResult {
65 pub endpoints: Vec<Endpoint>,
66 pub thread: thread::JoinHandle<()>,
67 pub key_updater: Arc<EndpointKeyUpdater>,
68}
69
70pub(crate) const DEFAULT_MAX_COALESCE_CHANNEL_SIZE: usize = 1_000_000;
72
73#[allow(clippy::field_reassign_with_default)] pub(crate) fn configure_server(
76 identity_keypair: &Keypair,
77) -> Result<(ServerConfig, String), QuicServerError> {
78 let (cert, priv_key) = new_dummy_x509_certificate(identity_keypair);
79 let cert_chain_pem_parts = vec![Pem {
80 tag: "CERTIFICATE".to_string(),
81 contents: cert.as_ref().to_vec(),
82 }];
83 let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
84
85 let mut server_tls_config =
86 tls_server_config_builder().with_single_cert(vec![cert], priv_key)?;
87 server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
88 server_tls_config.key_log = Arc::new(KeyLogFile::new());
89 let quic_server_config = QuicServerConfig::try_from(server_tls_config)?;
90
91 let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
92 let config = Arc::get_mut(&mut server_config.transport).unwrap();
93
94 const MAX_CONCURRENT_UNI_STREAMS: u32 =
96 (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS.saturating_mul(2)) as u32;
97 config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
98 config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
99 config.receive_window((PACKET_DATA_SIZE as u32).into());
100 let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
101 config.max_idle_timeout(Some(timeout));
102
103 const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
105 config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
106 config.datagram_receive_buffer_size(None);
107
108 config.enable_segmentation_offload(false);
113
114 Ok((server_config, cert_chain_pem))
115}
116
117pub fn rt(name: String) -> Runtime {
118 tokio::runtime::Builder::new_multi_thread()
119 .thread_name(name)
120 .enable_all()
121 .build()
122 .unwrap()
123}
124
125#[derive(thiserror::Error, Debug)]
126pub enum QuicServerError {
127 #[error("Endpoint creation failed: {0}")]
128 EndpointFailed(std::io::Error),
129 #[error("TLS error: {0}")]
130 TlsError(#[from] rustls::Error),
131 #[error("No initial cipher suite")]
132 NoInitialCipherSuite(#[from] NoInitialCipherSuite),
133}
134
135pub struct EndpointKeyUpdater {
136 endpoints: Vec<Endpoint>,
137}
138
139impl NotifyKeyUpdate for EndpointKeyUpdater {
140 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
141 let (config, _) = configure_server(key)?;
142 for endpoint in &self.endpoints {
143 endpoint.set_server_config(Some(config.clone()));
144 }
145 Ok(())
146 }
147}
148
149#[derive(Default)]
150pub struct StreamerStats {
151 pub(crate) total_connections: AtomicUsize,
152 pub(crate) total_new_connections: AtomicUsize,
153 pub(crate) total_streams: AtomicUsize,
154 pub(crate) total_new_streams: AtomicUsize,
155 pub(crate) invalid_stream_size: AtomicUsize,
156 pub(crate) total_packets_allocated: AtomicUsize,
157 pub(crate) total_packet_batches_allocated: AtomicUsize,
158 pub(crate) total_chunks_received: AtomicUsize,
159 pub(crate) total_staked_chunks_received: AtomicUsize,
160 pub(crate) total_unstaked_chunks_received: AtomicUsize,
161 pub(crate) total_packet_batch_send_err: AtomicUsize,
162 pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
163 pub(crate) total_packet_batches_sent: AtomicUsize,
164 pub(crate) total_packet_batches_none: AtomicUsize,
165 pub(crate) total_packets_sent_for_batching: AtomicUsize,
166 pub(crate) total_bytes_sent_for_batching: AtomicUsize,
167 pub(crate) total_chunks_sent_for_batching: AtomicUsize,
168 pub(crate) total_packets_sent_to_consumer: AtomicUsize,
169 pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
170 pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
171 pub(crate) total_stream_read_errors: AtomicUsize,
172 pub(crate) total_stream_read_timeouts: AtomicUsize,
173 pub(crate) num_evictions: AtomicUsize,
174 pub(crate) connection_added_from_staked_peer: AtomicUsize,
175 pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
176 pub(crate) connection_add_failed: AtomicUsize,
177 pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
178 pub(crate) connection_add_failed_staked_node: AtomicUsize,
179 pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
180 pub(crate) connection_add_failed_on_pruning: AtomicUsize,
181 pub(crate) connection_setup_timeout: AtomicUsize,
182 pub(crate) connection_setup_error: AtomicUsize,
183 pub(crate) connection_setup_error_closed: AtomicUsize,
184 pub(crate) connection_setup_error_timed_out: AtomicUsize,
185 pub(crate) connection_setup_error_transport: AtomicUsize,
186 pub(crate) connection_setup_error_app_closed: AtomicUsize,
187 pub(crate) connection_setup_error_reset: AtomicUsize,
188 pub(crate) connection_setup_error_locally_closed: AtomicUsize,
189 pub(crate) connection_removed: AtomicUsize,
190 pub(crate) connection_remove_failed: AtomicUsize,
191 pub(crate) connection_rate_limited_across_all: AtomicUsize,
194 pub(crate) connection_rate_limited_per_ipaddr: AtomicUsize,
197 pub(crate) throttled_streams: AtomicUsize,
198 pub(crate) stream_load_ema: AtomicUsize,
199 pub(crate) stream_load_ema_overflow: AtomicUsize,
200 pub(crate) stream_load_capacity_overflow: AtomicUsize,
201 pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
202 pub(crate) perf_track_overhead_us: AtomicU64,
203 pub(crate) total_staked_packets_sent_for_batching: AtomicUsize,
204 pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize,
205 pub(crate) throttled_staked_streams: AtomicUsize,
206 pub(crate) throttled_unstaked_streams: AtomicUsize,
207 pub(crate) connection_rate_limiter_length: AtomicUsize,
208 pub(crate) open_connections: AtomicUsize,
210 pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
211 pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
212 pub(crate) total_incoming_connection_attempts: AtomicUsize,
213 pub(crate) quic_endpoints_count: AtomicUsize,
214}
215
216impl StreamerStats {
217 pub fn report(&self, name: &'static str) {
218 let process_sampled_packets_us_hist = {
219 let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
220 let process_sampled_packets_us_hist = metrics.clone();
221 metrics.clear();
222 process_sampled_packets_us_hist
223 };
224
225 datapoint_info!(
226 name,
227 (
228 "active_connections",
229 self.total_connections.load(Ordering::Relaxed),
230 i64
231 ),
232 (
233 "active_streams",
234 self.total_streams.load(Ordering::Relaxed),
235 i64
236 ),
237 (
238 "new_connections",
239 self.total_new_connections.swap(0, Ordering::Relaxed),
240 i64
241 ),
242 (
243 "new_streams",
244 self.total_new_streams.swap(0, Ordering::Relaxed),
245 i64
246 ),
247 (
248 "evictions",
249 self.num_evictions.swap(0, Ordering::Relaxed),
250 i64
251 ),
252 (
253 "connection_added_from_staked_peer",
254 self.connection_added_from_staked_peer
255 .swap(0, Ordering::Relaxed),
256 i64
257 ),
258 (
259 "connection_added_from_unstaked_peer",
260 self.connection_added_from_unstaked_peer
261 .swap(0, Ordering::Relaxed),
262 i64
263 ),
264 (
265 "connection_add_failed",
266 self.connection_add_failed.swap(0, Ordering::Relaxed),
267 i64
268 ),
269 (
270 "connection_add_failed_invalid_stream_count",
271 self.connection_add_failed_invalid_stream_count
272 .swap(0, Ordering::Relaxed),
273 i64
274 ),
275 (
276 "connection_add_failed_staked_node",
277 self.connection_add_failed_staked_node
278 .swap(0, Ordering::Relaxed),
279 i64
280 ),
281 (
282 "connection_add_failed_unstaked_node",
283 self.connection_add_failed_unstaked_node
284 .swap(0, Ordering::Relaxed),
285 i64
286 ),
287 (
288 "connection_add_failed_on_pruning",
289 self.connection_add_failed_on_pruning
290 .swap(0, Ordering::Relaxed),
291 i64
292 ),
293 (
294 "connection_removed",
295 self.connection_removed.swap(0, Ordering::Relaxed),
296 i64
297 ),
298 (
299 "connection_remove_failed",
300 self.connection_remove_failed.swap(0, Ordering::Relaxed),
301 i64
302 ),
303 (
304 "connection_setup_timeout",
305 self.connection_setup_timeout.swap(0, Ordering::Relaxed),
306 i64
307 ),
308 (
309 "connection_setup_error",
310 self.connection_setup_error.swap(0, Ordering::Relaxed),
311 i64
312 ),
313 (
314 "connection_setup_error_timed_out",
315 self.connection_setup_error_timed_out
316 .swap(0, Ordering::Relaxed),
317 i64
318 ),
319 (
320 "connection_setup_error_closed",
321 self.connection_setup_error_closed
322 .swap(0, Ordering::Relaxed),
323 i64
324 ),
325 (
326 "connection_setup_error_transport",
327 self.connection_setup_error_transport
328 .swap(0, Ordering::Relaxed),
329 i64
330 ),
331 (
332 "connection_setup_error_app_closed",
333 self.connection_setup_error_app_closed
334 .swap(0, Ordering::Relaxed),
335 i64
336 ),
337 (
338 "connection_setup_error_reset",
339 self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
340 i64
341 ),
342 (
343 "connection_setup_error_locally_closed",
344 self.connection_setup_error_locally_closed
345 .swap(0, Ordering::Relaxed),
346 i64
347 ),
348 (
349 "connection_rate_limited_across_all",
350 self.connection_rate_limited_across_all
351 .swap(0, Ordering::Relaxed),
352 i64
353 ),
354 (
355 "connection_rate_limited_per_ipaddr",
356 self.connection_rate_limited_per_ipaddr
357 .swap(0, Ordering::Relaxed),
358 i64
359 ),
360 (
361 "invalid_stream_size",
362 self.invalid_stream_size.swap(0, Ordering::Relaxed),
363 i64
364 ),
365 (
366 "packets_allocated",
367 self.total_packets_allocated.swap(0, Ordering::Relaxed),
368 i64
369 ),
370 (
371 "packet_batches_allocated",
372 self.total_packet_batches_allocated
373 .swap(0, Ordering::Relaxed),
374 i64
375 ),
376 (
377 "packets_sent_for_batching",
378 self.total_packets_sent_for_batching
379 .swap(0, Ordering::Relaxed),
380 i64
381 ),
382 (
383 "staked_packets_sent_for_batching",
384 self.total_staked_packets_sent_for_batching
385 .swap(0, Ordering::Relaxed),
386 i64
387 ),
388 (
389 "unstaked_packets_sent_for_batching",
390 self.total_unstaked_packets_sent_for_batching
391 .swap(0, Ordering::Relaxed),
392 i64
393 ),
394 (
395 "bytes_sent_for_batching",
396 self.total_bytes_sent_for_batching
397 .swap(0, Ordering::Relaxed),
398 i64
399 ),
400 (
401 "chunks_sent_for_batching",
402 self.total_chunks_sent_for_batching
403 .swap(0, Ordering::Relaxed),
404 i64
405 ),
406 (
407 "packets_sent_to_consumer",
408 self.total_packets_sent_to_consumer
409 .swap(0, Ordering::Relaxed),
410 i64
411 ),
412 (
413 "bytes_sent_to_consumer",
414 self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
415 i64
416 ),
417 (
418 "chunks_processed_by_batcher",
419 self.total_chunks_processed_by_batcher
420 .swap(0, Ordering::Relaxed),
421 i64
422 ),
423 (
424 "chunks_received",
425 self.total_chunks_received.swap(0, Ordering::Relaxed),
426 i64
427 ),
428 (
429 "staked_chunks_received",
430 self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
431 i64
432 ),
433 (
434 "unstaked_chunks_received",
435 self.total_unstaked_chunks_received
436 .swap(0, Ordering::Relaxed),
437 i64
438 ),
439 (
440 "packet_batch_send_error",
441 self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
442 i64
443 ),
444 (
445 "handle_chunk_to_packet_batcher_send_error",
446 self.total_handle_chunk_to_packet_batcher_send_err
447 .swap(0, Ordering::Relaxed),
448 i64
449 ),
450 (
451 "packet_batches_sent",
452 self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
453 i64
454 ),
455 (
456 "packet_batch_empty",
457 self.total_packet_batches_none.swap(0, Ordering::Relaxed),
458 i64
459 ),
460 (
461 "stream_read_errors",
462 self.total_stream_read_errors.swap(0, Ordering::Relaxed),
463 i64
464 ),
465 (
466 "stream_read_timeouts",
467 self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
468 i64
469 ),
470 (
471 "throttled_streams",
472 self.throttled_streams.swap(0, Ordering::Relaxed),
473 i64
474 ),
475 (
476 "stream_load_ema",
477 self.stream_load_ema.load(Ordering::Relaxed),
478 i64
479 ),
480 (
481 "stream_load_ema_overflow",
482 self.stream_load_ema_overflow.load(Ordering::Relaxed),
483 i64
484 ),
485 (
486 "stream_load_capacity_overflow",
487 self.stream_load_capacity_overflow.load(Ordering::Relaxed),
488 i64
489 ),
490 (
491 "throttled_unstaked_streams",
492 self.throttled_unstaked_streams.swap(0, Ordering::Relaxed),
493 i64
494 ),
495 (
496 "throttled_staked_streams",
497 self.throttled_staked_streams.swap(0, Ordering::Relaxed),
498 i64
499 ),
500 (
501 "process_sampled_packets_us_90pct",
502 process_sampled_packets_us_hist
503 .percentile(90.0)
504 .unwrap_or(0),
505 i64
506 ),
507 (
508 "process_sampled_packets_us_min",
509 process_sampled_packets_us_hist.minimum().unwrap_or(0),
510 i64
511 ),
512 (
513 "process_sampled_packets_us_max",
514 process_sampled_packets_us_hist.maximum().unwrap_or(0),
515 i64
516 ),
517 (
518 "process_sampled_packets_us_mean",
519 process_sampled_packets_us_hist.mean().unwrap_or(0),
520 i64
521 ),
522 (
523 "process_sampled_packets_count",
524 process_sampled_packets_us_hist.entries(),
525 i64
526 ),
527 (
528 "perf_track_overhead_us",
529 self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
530 i64
531 ),
532 (
533 "connection_rate_limiter_length",
534 self.connection_rate_limiter_length.load(Ordering::Relaxed),
535 i64
536 ),
537 (
538 "outstanding_incoming_connection_attempts",
539 self.outstanding_incoming_connection_attempts
540 .load(Ordering::Relaxed),
541 i64
542 ),
543 (
544 "total_incoming_connection_attempts",
545 self.total_incoming_connection_attempts
546 .load(Ordering::Relaxed),
547 i64
548 ),
549 (
550 "quic_endpoints_count",
551 self.quic_endpoints_count.load(Ordering::Relaxed),
552 i64
553 ),
554 (
555 "open_connections",
556 self.open_connections.load(Ordering::Relaxed),
557 i64
558 ),
559 (
560 "refused_connections_too_many_open_connections",
561 self.refused_connections_too_many_open_connections
562 .swap(0, Ordering::Relaxed),
563 i64
564 ),
565 );
566 }
567}
568
569pub fn spawn_server(
570 thread_name: &'static str,
571 metrics_name: &'static str,
572 socket: UdpSocket,
573 keypair: &Keypair,
574 packet_sender: Sender<PacketBatch>,
575 exit: Arc<AtomicBool>,
576 staked_nodes: Arc<RwLock<StakedNodes>>,
577 quic_server_params: QuicServerParams,
578) -> Result<SpawnServerResult, QuicServerError> {
579 spawn_server_multi(
580 thread_name,
581 metrics_name,
582 vec![socket],
583 keypair,
584 packet_sender,
585 exit,
586 staked_nodes,
587 quic_server_params,
588 )
589}
590
591#[derive(Clone)]
592pub struct QuicServerParams {
593 pub max_connections_per_peer: usize,
594 pub max_staked_connections: usize,
595 pub max_unstaked_connections: usize,
596 pub max_streams_per_ms: u64,
597 pub max_connections_per_ipaddr_per_min: u64,
598 pub wait_for_chunk_timeout: Duration,
599 pub coalesce: Duration,
600 pub coalesce_channel_size: usize,
601}
602
603impl Default for QuicServerParams {
604 fn default() -> Self {
605 QuicServerParams {
606 max_connections_per_peer: 1,
607 max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
608 max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
609 max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
610 max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
611 wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
612 coalesce: DEFAULT_TPU_COALESCE,
613 coalesce_channel_size: DEFAULT_MAX_COALESCE_CHANNEL_SIZE,
614 }
615 }
616}
617
618pub fn spawn_server_multi(
619 thread_name: &'static str,
620 metrics_name: &'static str,
621 sockets: Vec<UdpSocket>,
622 keypair: &Keypair,
623 packet_sender: Sender<PacketBatch>,
624 exit: Arc<AtomicBool>,
625 staked_nodes: Arc<RwLock<StakedNodes>>,
626 quic_server_params: QuicServerParams,
627) -> Result<SpawnServerResult, QuicServerError> {
628 let runtime = rt(format!("{thread_name}Rt"));
629 let result = {
630 let _guard = runtime.enter();
631 crate::nonblocking::quic::spawn_server_multi(
632 metrics_name,
633 sockets,
634 keypair,
635 packet_sender,
636 exit,
637 staked_nodes,
638 quic_server_params,
639 )
640 }?;
641 let handle = thread::Builder::new()
642 .name(thread_name.into())
643 .spawn(move || {
644 if let Err(e) = runtime.block_on(result.thread) {
645 warn!("error from runtime.block_on: {:?}", e);
646 }
647 })
648 .unwrap();
649 let updater = EndpointKeyUpdater {
650 endpoints: result.endpoints.clone(),
651 };
652 Ok(SpawnServerResult {
653 endpoints: result.endpoints,
654 thread: handle,
655 key_updater: Arc::new(updater),
656 })
657}
658
659#[cfg(test)]
660mod test {
661 use {
662 super::*,
663 crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
664 crossbeam_channel::unbounded,
665 solana_net_utils::bind_to_localhost,
666 std::net::SocketAddr,
667 };
668
669 fn setup_quic_server() -> (
670 std::thread::JoinHandle<()>,
671 Arc<AtomicBool>,
672 crossbeam_channel::Receiver<PacketBatch>,
673 SocketAddr,
674 ) {
675 let s = bind_to_localhost().unwrap();
676 let exit = Arc::new(AtomicBool::new(false));
677 let (sender, receiver) = unbounded();
678 let keypair = Keypair::new();
679 let server_address = s.local_addr().unwrap();
680 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
681 let SpawnServerResult {
682 endpoints: _,
683 thread: t,
684 key_updater: _,
685 } = spawn_server(
686 "solQuicTest",
687 "quic_streamer_test",
688 s,
689 &keypair,
690 sender,
691 exit.clone(),
692 staked_nodes,
693 QuicServerParams {
694 coalesce_channel_size: 100_000, ..Default::default()
696 },
697 )
698 .unwrap();
699 (t, exit, receiver, server_address)
700 }
701
702 #[test]
703 fn test_quic_server_exit() {
704 let (t, exit, _receiver, _server_address) = setup_quic_server();
705 exit.store(true, Ordering::Relaxed);
706 t.join().unwrap();
707 }
708
709 #[test]
710 fn test_quic_timeout() {
711 solana_logger::setup();
712 let (t, exit, receiver, server_address) = setup_quic_server();
713 let runtime = rt("solQuicTestRt".to_string());
714 runtime.block_on(check_timeout(receiver, server_address));
715 exit.store(true, Ordering::Relaxed);
716 t.join().unwrap();
717 }
718
719 #[test]
720 fn test_quic_server_block_multiple_connections() {
721 solana_logger::setup();
722 let (t, exit, _receiver, server_address) = setup_quic_server();
723
724 let runtime = rt("solQuicTestRt".to_string());
725 runtime.block_on(check_block_multiple_connections(server_address));
726 exit.store(true, Ordering::Relaxed);
727 t.join().unwrap();
728 }
729
730 #[test]
731 fn test_quic_server_multiple_streams() {
732 solana_logger::setup();
733 let s = bind_to_localhost().unwrap();
734 let exit = Arc::new(AtomicBool::new(false));
735 let (sender, receiver) = unbounded();
736 let keypair = Keypair::new();
737 let server_address = s.local_addr().unwrap();
738 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
739 let SpawnServerResult {
740 endpoints: _,
741 thread: t,
742 key_updater: _,
743 } = spawn_server(
744 "solQuicTest",
745 "quic_streamer_test",
746 s,
747 &keypair,
748 sender,
749 exit.clone(),
750 staked_nodes,
751 QuicServerParams {
752 max_connections_per_peer: 2,
753 coalesce_channel_size: 100_000, ..QuicServerParams::default()
755 },
756 )
757 .unwrap();
758
759 let runtime = rt("solQuicTestRt".to_string());
760 runtime.block_on(check_multiple_streams(receiver, server_address, None));
761 exit.store(true, Ordering::Relaxed);
762 t.join().unwrap();
763 }
764
765 #[test]
766 fn test_quic_server_multiple_writes() {
767 solana_logger::setup();
768 let (t, exit, receiver, server_address) = setup_quic_server();
769
770 let runtime = rt("solQuicTestRt".to_string());
771 runtime.block_on(check_multiple_writes(receiver, server_address, None));
772 exit.store(true, Ordering::Relaxed);
773 t.join().unwrap();
774 }
775
776 #[test]
777 fn test_quic_server_unstaked_node_connect_failure() {
778 solana_logger::setup();
779 let s = bind_to_localhost().unwrap();
780 let exit = Arc::new(AtomicBool::new(false));
781 let (sender, _) = unbounded();
782 let keypair = Keypair::new();
783 let server_address = s.local_addr().unwrap();
784 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
785 let SpawnServerResult {
786 endpoints: _,
787 thread: t,
788 key_updater: _,
789 } = spawn_server(
790 "solQuicTest",
791 "quic_streamer_test",
792 s,
793 &keypair,
794 sender,
795 exit.clone(),
796 staked_nodes,
797 QuicServerParams {
798 max_unstaked_connections: 0,
799 coalesce_channel_size: 100_000, ..QuicServerParams::default()
801 },
802 )
803 .unwrap();
804
805 let runtime = rt("solQuicTestRt".to_string());
806 runtime.block_on(check_unstaked_node_connect_failure(server_address));
807 exit.store(true, Ordering::Relaxed);
808 t.join().unwrap();
809 }
810
811 #[test]
812 fn test_inline_tpu_coalesce() {
813 assert_eq!(DEFAULT_TPU_COALESCE, solana_sdk::net::DEFAULT_TPU_COALESCE);
814 }
815}