solana_streamer/
quic.rs

1use {
2    crate::{
3        nonblocking::quic::ALPN_TPU_PROTOCOL_ID, streamer::StakedNodes,
4        tls_certificates::new_self_signed_tls_certificate_chain,
5    },
6    crossbeam_channel::Sender,
7    pem::Pem,
8    quinn::{IdleTimeout, ServerConfig, VarInt},
9    rustls::{server::ClientCertVerified, Certificate, DistinguishedNames},
10    solana_perf::packet::PacketBatch,
11    solana_sdk::{
12        packet::PACKET_DATA_SIZE,
13        quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
14        signature::Keypair,
15    },
16    std::{
17        net::{IpAddr, UdpSocket},
18        sync::{
19            atomic::{AtomicBool, AtomicUsize, Ordering},
20            Arc, RwLock,
21        },
22        thread,
23        time::SystemTime,
24    },
25    tokio::runtime::{Builder, Runtime},
26};
27
28pub const MAX_STAKED_CONNECTIONS: usize = 2000;
29pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
30const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 4;
31
32struct SkipClientVerification;
33
34impl SkipClientVerification {
35    pub fn new() -> Arc<Self> {
36        Arc::new(Self)
37    }
38}
39
40impl rustls::server::ClientCertVerifier for SkipClientVerification {
41    fn client_auth_root_subjects(&self) -> Option<DistinguishedNames> {
42        Some(DistinguishedNames::new())
43    }
44
45    fn verify_client_cert(
46        &self,
47        _end_entity: &Certificate,
48        _intermediates: &[Certificate],
49        _now: SystemTime,
50    ) -> Result<ClientCertVerified, rustls::Error> {
51        Ok(rustls::server::ClientCertVerified::assertion())
52    }
53}
54
55/// Returns default server configuration along with its PEM certificate chain.
56#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
57pub(crate) fn configure_server(
58    identity_keypair: &Keypair,
59    gossip_host: IpAddr,
60) -> Result<(ServerConfig, String), QuicServerError> {
61    let (cert_chain, priv_key) =
62        new_self_signed_tls_certificate_chain(identity_keypair, gossip_host)
63            .map_err(|_e| QuicServerError::ConfigureFailed)?;
64    let cert_chain_pem_parts: Vec<Pem> = cert_chain
65        .iter()
66        .map(|cert| Pem {
67            tag: "CERTIFICATE".to_string(),
68            contents: cert.0.clone(),
69        })
70        .collect();
71    let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
72
73    let mut server_tls_config = rustls::ServerConfig::builder()
74        .with_safe_defaults()
75        .with_client_cert_verifier(SkipClientVerification::new())
76        .with_single_cert(cert_chain, priv_key)
77        .map_err(|_e| QuicServerError::ConfigureFailed)?;
78    server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
79
80    let mut server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
81    let config = Arc::get_mut(&mut server_config.transport).unwrap();
82
83    // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
84    const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32;
85    config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
86    config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
87    config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
88    let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
89    config.max_idle_timeout(Some(timeout));
90
91    // disable bidi & datagrams
92    const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
93    config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
94    config.datagram_receive_buffer_size(None);
95
96    Ok((server_config, cert_chain_pem))
97}
98
99fn rt() -> Runtime {
100    Builder::new_multi_thread()
101        .worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS)
102        .enable_all()
103        .build()
104        .unwrap()
105}
106
107#[derive(thiserror::Error, Debug)]
108pub enum QuicServerError {
109    #[error("Server configure failed")]
110    ConfigureFailed,
111
112    #[error("Endpoint creation failed")]
113    EndpointFailed,
114}
115
116#[derive(Default)]
117pub struct StreamStats {
118    pub(crate) total_connections: AtomicUsize,
119    pub(crate) total_new_connections: AtomicUsize,
120    pub(crate) total_streams: AtomicUsize,
121    pub(crate) total_new_streams: AtomicUsize,
122    pub(crate) total_invalid_chunks: AtomicUsize,
123    pub(crate) total_invalid_chunk_size: AtomicUsize,
124    pub(crate) total_packets_allocated: AtomicUsize,
125    pub(crate) total_chunks_received: AtomicUsize,
126    pub(crate) total_packet_batch_send_err: AtomicUsize,
127    pub(crate) total_packet_batches_sent: AtomicUsize,
128    pub(crate) total_packet_batches_none: AtomicUsize,
129    pub(crate) total_stream_read_errors: AtomicUsize,
130    pub(crate) total_stream_read_timeouts: AtomicUsize,
131    pub(crate) num_evictions: AtomicUsize,
132    pub(crate) connection_added_from_staked_peer: AtomicUsize,
133    pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
134    pub(crate) connection_add_failed: AtomicUsize,
135    pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
136    pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
137    pub(crate) connection_add_failed_on_pruning: AtomicUsize,
138    pub(crate) connection_setup_timeout: AtomicUsize,
139    pub(crate) connection_setup_error: AtomicUsize,
140    pub(crate) connection_removed: AtomicUsize,
141    pub(crate) connection_remove_failed: AtomicUsize,
142}
143
144impl StreamStats {
145    pub fn report(&self) {
146        datapoint_info!(
147            "quic-connections",
148            (
149                "active_connections",
150                self.total_connections.load(Ordering::Relaxed),
151                i64
152            ),
153            (
154                "active_streams",
155                self.total_streams.load(Ordering::Relaxed),
156                i64
157            ),
158            (
159                "new_connections",
160                self.total_new_connections.swap(0, Ordering::Relaxed),
161                i64
162            ),
163            (
164                "new_streams",
165                self.total_new_streams.swap(0, Ordering::Relaxed),
166                i64
167            ),
168            (
169                "evictions",
170                self.num_evictions.swap(0, Ordering::Relaxed),
171                i64
172            ),
173            (
174                "connection_added_from_staked_peer",
175                self.connection_added_from_staked_peer
176                    .swap(0, Ordering::Relaxed),
177                i64
178            ),
179            (
180                "connection_added_from_unstaked_peer",
181                self.connection_added_from_unstaked_peer
182                    .swap(0, Ordering::Relaxed),
183                i64
184            ),
185            (
186                "connection_add_failed",
187                self.connection_add_failed.swap(0, Ordering::Relaxed),
188                i64
189            ),
190            (
191                "connection_add_failed_invalid_stream_count",
192                self.connection_add_failed_invalid_stream_count
193                    .swap(0, Ordering::Relaxed),
194                i64
195            ),
196            (
197                "connection_add_failed_unstaked_node",
198                self.connection_add_failed_unstaked_node
199                    .swap(0, Ordering::Relaxed),
200                i64
201            ),
202            (
203                "connection_add_failed_on_pruning",
204                self.connection_add_failed_on_pruning
205                    .swap(0, Ordering::Relaxed),
206                i64
207            ),
208            (
209                "connection_removed",
210                self.connection_removed.swap(0, Ordering::Relaxed),
211                i64
212            ),
213            (
214                "connection_remove_failed",
215                self.connection_remove_failed.swap(0, Ordering::Relaxed),
216                i64
217            ),
218            (
219                "connection_setup_timeout",
220                self.connection_setup_timeout.swap(0, Ordering::Relaxed),
221                i64
222            ),
223            (
224                "connection_setup_error",
225                self.connection_setup_error.swap(0, Ordering::Relaxed),
226                i64
227            ),
228            (
229                "invalid_chunk",
230                self.total_invalid_chunks.swap(0, Ordering::Relaxed),
231                i64
232            ),
233            (
234                "invalid_chunk_size",
235                self.total_invalid_chunk_size.swap(0, Ordering::Relaxed),
236                i64
237            ),
238            (
239                "packets_allocated",
240                self.total_packets_allocated.swap(0, Ordering::Relaxed),
241                i64
242            ),
243            (
244                "chunks_received",
245                self.total_chunks_received.swap(0, Ordering::Relaxed),
246                i64
247            ),
248            (
249                "packet_batch_send_error",
250                self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
251                i64
252            ),
253            (
254                "packet_batches_sent",
255                self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
256                i64
257            ),
258            (
259                "packet_batch_empty",
260                self.total_packet_batches_none.swap(0, Ordering::Relaxed),
261                i64
262            ),
263            (
264                "stream_read_errors",
265                self.total_stream_read_errors.swap(0, Ordering::Relaxed),
266                i64
267            ),
268            (
269                "stream_read_timeouts",
270                self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
271                i64
272            ),
273        );
274    }
275}
276
277#[allow(clippy::too_many_arguments)]
278pub fn spawn_server(
279    sock: UdpSocket,
280    keypair: &Keypair,
281    gossip_host: IpAddr,
282    packet_sender: Sender<PacketBatch>,
283    exit: Arc<AtomicBool>,
284    max_connections_per_peer: usize,
285    staked_nodes: Arc<RwLock<StakedNodes>>,
286    max_staked_connections: usize,
287    max_unstaked_connections: usize,
288    stats: Arc<StreamStats>,
289) -> Result<thread::JoinHandle<()>, QuicServerError> {
290    let runtime = rt();
291    let task = {
292        let _guard = runtime.enter();
293        crate::nonblocking::quic::spawn_server(
294            sock,
295            keypair,
296            gossip_host,
297            packet_sender,
298            exit,
299            max_connections_per_peer,
300            staked_nodes,
301            max_staked_connections,
302            max_unstaked_connections,
303            stats,
304        )
305    }?;
306    let handle = thread::Builder::new()
307        .name("solQuicServer".into())
308        .spawn(move || {
309            if let Err(e) = runtime.block_on(task) {
310                warn!("error from runtime.block_on: {:?}", e);
311            }
312        })
313        .unwrap();
314    Ok(handle)
315}
316
317#[cfg(test)]
318mod test {
319    use {
320        super::*, crate::nonblocking::quic::test::*, crossbeam_channel::unbounded,
321        std::net::SocketAddr,
322    };
323
324    fn setup_quic_server() -> (
325        std::thread::JoinHandle<()>,
326        Arc<AtomicBool>,
327        crossbeam_channel::Receiver<PacketBatch>,
328        SocketAddr,
329    ) {
330        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
331        let exit = Arc::new(AtomicBool::new(false));
332        let (sender, receiver) = unbounded();
333        let keypair = Keypair::new();
334        let ip = "127.0.0.1".parse().unwrap();
335        let server_address = s.local_addr().unwrap();
336        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
337        let stats = Arc::new(StreamStats::default());
338        let t = spawn_server(
339            s,
340            &keypair,
341            ip,
342            sender,
343            exit.clone(),
344            1,
345            staked_nodes,
346            MAX_STAKED_CONNECTIONS,
347            MAX_UNSTAKED_CONNECTIONS,
348            stats,
349        )
350        .unwrap();
351        (t, exit, receiver, server_address)
352    }
353
354    #[test]
355    fn test_quic_server_exit() {
356        let (t, exit, _receiver, _server_address) = setup_quic_server();
357        exit.store(true, Ordering::Relaxed);
358        t.join().unwrap();
359    }
360
361    #[test]
362    fn test_quic_timeout() {
363        solana_logger::setup();
364        let (t, exit, receiver, server_address) = setup_quic_server();
365        let runtime = rt();
366        runtime.block_on(check_timeout(receiver, server_address));
367        exit.store(true, Ordering::Relaxed);
368        t.join().unwrap();
369    }
370
371    #[test]
372    fn test_quic_server_block_multiple_connections() {
373        solana_logger::setup();
374        let (t, exit, _receiver, server_address) = setup_quic_server();
375
376        let runtime = rt();
377        runtime.block_on(check_block_multiple_connections(server_address));
378        exit.store(true, Ordering::Relaxed);
379        t.join().unwrap();
380    }
381
382    #[test]
383    fn test_quic_server_multiple_streams() {
384        solana_logger::setup();
385        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
386        let exit = Arc::new(AtomicBool::new(false));
387        let (sender, receiver) = unbounded();
388        let keypair = Keypair::new();
389        let ip = "127.0.0.1".parse().unwrap();
390        let server_address = s.local_addr().unwrap();
391        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
392        let stats = Arc::new(StreamStats::default());
393        let t = spawn_server(
394            s,
395            &keypair,
396            ip,
397            sender,
398            exit.clone(),
399            2,
400            staked_nodes,
401            MAX_STAKED_CONNECTIONS,
402            MAX_UNSTAKED_CONNECTIONS,
403            stats,
404        )
405        .unwrap();
406
407        let runtime = rt();
408        runtime.block_on(check_multiple_streams(receiver, server_address));
409        exit.store(true, Ordering::Relaxed);
410        t.join().unwrap();
411    }
412
413    #[test]
414    fn test_quic_server_multiple_writes() {
415        solana_logger::setup();
416        let (t, exit, receiver, server_address) = setup_quic_server();
417
418        let runtime = rt();
419        runtime.block_on(check_multiple_writes(receiver, server_address, None));
420        exit.store(true, Ordering::Relaxed);
421        t.join().unwrap();
422    }
423
424    #[test]
425    fn test_quic_server_unstaked_node_connect_failure() {
426        solana_logger::setup();
427        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
428        let exit = Arc::new(AtomicBool::new(false));
429        let (sender, _) = unbounded();
430        let keypair = Keypair::new();
431        let ip = "127.0.0.1".parse().unwrap();
432        let server_address = s.local_addr().unwrap();
433        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
434        let stats = Arc::new(StreamStats::default());
435        let t = spawn_server(
436            s,
437            &keypair,
438            ip,
439            sender,
440            exit.clone(),
441            1,
442            staked_nodes,
443            MAX_STAKED_CONNECTIONS,
444            0, // Do not allow any connection from unstaked clients/nodes
445            stats,
446        )
447        .unwrap();
448
449        let runtime = rt();
450        runtime.block_on(check_unstaked_node_connect_failure(server_address));
451        exit.store(true, Ordering::Relaxed);
452        t.join().unwrap();
453    }
454}