1use {
2 crate::{
3 nonblocking::quic::ALPN_TPU_PROTOCOL_ID, streamer::StakedNodes,
4 tls_certificates::new_self_signed_tls_certificate_chain,
5 },
6 crossbeam_channel::Sender,
7 pem::Pem,
8 quinn::{IdleTimeout, ServerConfig, VarInt},
9 rustls::{server::ClientCertVerified, Certificate, DistinguishedNames},
10 solana_perf::packet::PacketBatch,
11 solana_sdk::{
12 packet::PACKET_DATA_SIZE,
13 quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
14 signature::Keypair,
15 },
16 std::{
17 net::{IpAddr, UdpSocket},
18 sync::{
19 atomic::{AtomicBool, AtomicUsize, Ordering},
20 Arc, RwLock,
21 },
22 thread,
23 time::SystemTime,
24 },
25 tokio::runtime::{Builder, Runtime},
26};
27
28pub const MAX_STAKED_CONNECTIONS: usize = 2000;
29pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
30const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 4;
31
32struct SkipClientVerification;
33
34impl SkipClientVerification {
35 pub fn new() -> Arc<Self> {
36 Arc::new(Self)
37 }
38}
39
40impl rustls::server::ClientCertVerifier for SkipClientVerification {
41 fn client_auth_root_subjects(&self) -> Option<DistinguishedNames> {
42 Some(DistinguishedNames::new())
43 }
44
45 fn verify_client_cert(
46 &self,
47 _end_entity: &Certificate,
48 _intermediates: &[Certificate],
49 _now: SystemTime,
50 ) -> Result<ClientCertVerified, rustls::Error> {
51 Ok(rustls::server::ClientCertVerified::assertion())
52 }
53}
54
55#[allow(clippy::field_reassign_with_default)] pub(crate) fn configure_server(
58 identity_keypair: &Keypair,
59 gossip_host: IpAddr,
60) -> Result<(ServerConfig, String), QuicServerError> {
61 let (cert_chain, priv_key) =
62 new_self_signed_tls_certificate_chain(identity_keypair, gossip_host)
63 .map_err(|_e| QuicServerError::ConfigureFailed)?;
64 let cert_chain_pem_parts: Vec<Pem> = cert_chain
65 .iter()
66 .map(|cert| Pem {
67 tag: "CERTIFICATE".to_string(),
68 contents: cert.0.clone(),
69 })
70 .collect();
71 let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
72
73 let mut server_tls_config = rustls::ServerConfig::builder()
74 .with_safe_defaults()
75 .with_client_cert_verifier(SkipClientVerification::new())
76 .with_single_cert(cert_chain, priv_key)
77 .map_err(|_e| QuicServerError::ConfigureFailed)?;
78 server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
79
80 let mut server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
81 let config = Arc::get_mut(&mut server_config.transport).unwrap();
82
83 const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32;
85 config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
86 config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
87 config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
88 let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
89 config.max_idle_timeout(Some(timeout));
90
91 const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
93 config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
94 config.datagram_receive_buffer_size(None);
95
96 Ok((server_config, cert_chain_pem))
97}
98
99fn rt() -> Runtime {
100 Builder::new_multi_thread()
101 .worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS)
102 .enable_all()
103 .build()
104 .unwrap()
105}
106
107#[derive(thiserror::Error, Debug)]
108pub enum QuicServerError {
109 #[error("Server configure failed")]
110 ConfigureFailed,
111
112 #[error("Endpoint creation failed")]
113 EndpointFailed,
114}
115
116#[derive(Default)]
117pub struct StreamStats {
118 pub(crate) total_connections: AtomicUsize,
119 pub(crate) total_new_connections: AtomicUsize,
120 pub(crate) total_streams: AtomicUsize,
121 pub(crate) total_new_streams: AtomicUsize,
122 pub(crate) total_invalid_chunks: AtomicUsize,
123 pub(crate) total_invalid_chunk_size: AtomicUsize,
124 pub(crate) total_packets_allocated: AtomicUsize,
125 pub(crate) total_chunks_received: AtomicUsize,
126 pub(crate) total_packet_batch_send_err: AtomicUsize,
127 pub(crate) total_packet_batches_sent: AtomicUsize,
128 pub(crate) total_packet_batches_none: AtomicUsize,
129 pub(crate) total_stream_read_errors: AtomicUsize,
130 pub(crate) total_stream_read_timeouts: AtomicUsize,
131 pub(crate) num_evictions: AtomicUsize,
132 pub(crate) connection_added_from_staked_peer: AtomicUsize,
133 pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
134 pub(crate) connection_add_failed: AtomicUsize,
135 pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
136 pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
137 pub(crate) connection_add_failed_on_pruning: AtomicUsize,
138 pub(crate) connection_setup_timeout: AtomicUsize,
139 pub(crate) connection_setup_error: AtomicUsize,
140 pub(crate) connection_removed: AtomicUsize,
141 pub(crate) connection_remove_failed: AtomicUsize,
142}
143
144impl StreamStats {
145 pub fn report(&self) {
146 datapoint_info!(
147 "quic-connections",
148 (
149 "active_connections",
150 self.total_connections.load(Ordering::Relaxed),
151 i64
152 ),
153 (
154 "active_streams",
155 self.total_streams.load(Ordering::Relaxed),
156 i64
157 ),
158 (
159 "new_connections",
160 self.total_new_connections.swap(0, Ordering::Relaxed),
161 i64
162 ),
163 (
164 "new_streams",
165 self.total_new_streams.swap(0, Ordering::Relaxed),
166 i64
167 ),
168 (
169 "evictions",
170 self.num_evictions.swap(0, Ordering::Relaxed),
171 i64
172 ),
173 (
174 "connection_added_from_staked_peer",
175 self.connection_added_from_staked_peer
176 .swap(0, Ordering::Relaxed),
177 i64
178 ),
179 (
180 "connection_added_from_unstaked_peer",
181 self.connection_added_from_unstaked_peer
182 .swap(0, Ordering::Relaxed),
183 i64
184 ),
185 (
186 "connection_add_failed",
187 self.connection_add_failed.swap(0, Ordering::Relaxed),
188 i64
189 ),
190 (
191 "connection_add_failed_invalid_stream_count",
192 self.connection_add_failed_invalid_stream_count
193 .swap(0, Ordering::Relaxed),
194 i64
195 ),
196 (
197 "connection_add_failed_unstaked_node",
198 self.connection_add_failed_unstaked_node
199 .swap(0, Ordering::Relaxed),
200 i64
201 ),
202 (
203 "connection_add_failed_on_pruning",
204 self.connection_add_failed_on_pruning
205 .swap(0, Ordering::Relaxed),
206 i64
207 ),
208 (
209 "connection_removed",
210 self.connection_removed.swap(0, Ordering::Relaxed),
211 i64
212 ),
213 (
214 "connection_remove_failed",
215 self.connection_remove_failed.swap(0, Ordering::Relaxed),
216 i64
217 ),
218 (
219 "connection_setup_timeout",
220 self.connection_setup_timeout.swap(0, Ordering::Relaxed),
221 i64
222 ),
223 (
224 "connection_setup_error",
225 self.connection_setup_error.swap(0, Ordering::Relaxed),
226 i64
227 ),
228 (
229 "invalid_chunk",
230 self.total_invalid_chunks.swap(0, Ordering::Relaxed),
231 i64
232 ),
233 (
234 "invalid_chunk_size",
235 self.total_invalid_chunk_size.swap(0, Ordering::Relaxed),
236 i64
237 ),
238 (
239 "packets_allocated",
240 self.total_packets_allocated.swap(0, Ordering::Relaxed),
241 i64
242 ),
243 (
244 "chunks_received",
245 self.total_chunks_received.swap(0, Ordering::Relaxed),
246 i64
247 ),
248 (
249 "packet_batch_send_error",
250 self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
251 i64
252 ),
253 (
254 "packet_batches_sent",
255 self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
256 i64
257 ),
258 (
259 "packet_batch_empty",
260 self.total_packet_batches_none.swap(0, Ordering::Relaxed),
261 i64
262 ),
263 (
264 "stream_read_errors",
265 self.total_stream_read_errors.swap(0, Ordering::Relaxed),
266 i64
267 ),
268 (
269 "stream_read_timeouts",
270 self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
271 i64
272 ),
273 );
274 }
275}
276
277#[allow(clippy::too_many_arguments)]
278pub fn spawn_server(
279 sock: UdpSocket,
280 keypair: &Keypair,
281 gossip_host: IpAddr,
282 packet_sender: Sender<PacketBatch>,
283 exit: Arc<AtomicBool>,
284 max_connections_per_peer: usize,
285 staked_nodes: Arc<RwLock<StakedNodes>>,
286 max_staked_connections: usize,
287 max_unstaked_connections: usize,
288 stats: Arc<StreamStats>,
289) -> Result<thread::JoinHandle<()>, QuicServerError> {
290 let runtime = rt();
291 let task = {
292 let _guard = runtime.enter();
293 crate::nonblocking::quic::spawn_server(
294 sock,
295 keypair,
296 gossip_host,
297 packet_sender,
298 exit,
299 max_connections_per_peer,
300 staked_nodes,
301 max_staked_connections,
302 max_unstaked_connections,
303 stats,
304 )
305 }?;
306 let handle = thread::Builder::new()
307 .name("solQuicServer".into())
308 .spawn(move || {
309 if let Err(e) = runtime.block_on(task) {
310 warn!("error from runtime.block_on: {:?}", e);
311 }
312 })
313 .unwrap();
314 Ok(handle)
315}
316
317#[cfg(test)]
318mod test {
319 use {
320 super::*, crate::nonblocking::quic::test::*, crossbeam_channel::unbounded,
321 std::net::SocketAddr,
322 };
323
324 fn setup_quic_server() -> (
325 std::thread::JoinHandle<()>,
326 Arc<AtomicBool>,
327 crossbeam_channel::Receiver<PacketBatch>,
328 SocketAddr,
329 ) {
330 let s = UdpSocket::bind("127.0.0.1:0").unwrap();
331 let exit = Arc::new(AtomicBool::new(false));
332 let (sender, receiver) = unbounded();
333 let keypair = Keypair::new();
334 let ip = "127.0.0.1".parse().unwrap();
335 let server_address = s.local_addr().unwrap();
336 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
337 let stats = Arc::new(StreamStats::default());
338 let t = spawn_server(
339 s,
340 &keypair,
341 ip,
342 sender,
343 exit.clone(),
344 1,
345 staked_nodes,
346 MAX_STAKED_CONNECTIONS,
347 MAX_UNSTAKED_CONNECTIONS,
348 stats,
349 )
350 .unwrap();
351 (t, exit, receiver, server_address)
352 }
353
354 #[test]
355 fn test_quic_server_exit() {
356 let (t, exit, _receiver, _server_address) = setup_quic_server();
357 exit.store(true, Ordering::Relaxed);
358 t.join().unwrap();
359 }
360
361 #[test]
362 fn test_quic_timeout() {
363 solana_logger::setup();
364 let (t, exit, receiver, server_address) = setup_quic_server();
365 let runtime = rt();
366 runtime.block_on(check_timeout(receiver, server_address));
367 exit.store(true, Ordering::Relaxed);
368 t.join().unwrap();
369 }
370
371 #[test]
372 fn test_quic_server_block_multiple_connections() {
373 solana_logger::setup();
374 let (t, exit, _receiver, server_address) = setup_quic_server();
375
376 let runtime = rt();
377 runtime.block_on(check_block_multiple_connections(server_address));
378 exit.store(true, Ordering::Relaxed);
379 t.join().unwrap();
380 }
381
382 #[test]
383 fn test_quic_server_multiple_streams() {
384 solana_logger::setup();
385 let s = UdpSocket::bind("127.0.0.1:0").unwrap();
386 let exit = Arc::new(AtomicBool::new(false));
387 let (sender, receiver) = unbounded();
388 let keypair = Keypair::new();
389 let ip = "127.0.0.1".parse().unwrap();
390 let server_address = s.local_addr().unwrap();
391 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
392 let stats = Arc::new(StreamStats::default());
393 let t = spawn_server(
394 s,
395 &keypair,
396 ip,
397 sender,
398 exit.clone(),
399 2,
400 staked_nodes,
401 MAX_STAKED_CONNECTIONS,
402 MAX_UNSTAKED_CONNECTIONS,
403 stats,
404 )
405 .unwrap();
406
407 let runtime = rt();
408 runtime.block_on(check_multiple_streams(receiver, server_address));
409 exit.store(true, Ordering::Relaxed);
410 t.join().unwrap();
411 }
412
413 #[test]
414 fn test_quic_server_multiple_writes() {
415 solana_logger::setup();
416 let (t, exit, receiver, server_address) = setup_quic_server();
417
418 let runtime = rt();
419 runtime.block_on(check_multiple_writes(receiver, server_address, None));
420 exit.store(true, Ordering::Relaxed);
421 t.join().unwrap();
422 }
423
424 #[test]
425 fn test_quic_server_unstaked_node_connect_failure() {
426 solana_logger::setup();
427 let s = UdpSocket::bind("127.0.0.1:0").unwrap();
428 let exit = Arc::new(AtomicBool::new(false));
429 let (sender, _) = unbounded();
430 let keypair = Keypair::new();
431 let ip = "127.0.0.1".parse().unwrap();
432 let server_address = s.local_addr().unwrap();
433 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
434 let stats = Arc::new(StreamStats::default());
435 let t = spawn_server(
436 s,
437 &keypair,
438 ip,
439 sender,
440 exit.clone(),
441 1,
442 staked_nodes,
443 MAX_STAKED_CONNECTIONS,
444 0, stats,
446 )
447 .unwrap();
448
449 let runtime = rt();
450 runtime.block_on(check_unstaked_node_connect_failure(server_address));
451 exit.store(true, Ordering::Relaxed);
452 t.join().unwrap();
453 }
454}