solana_quic_client/
quic_client.rs

1//! Simple client that connects to a given UDP port with the QUIC protocol and provides
2//! an interface for sending data which is restricted by the server's flow control.
3
4use {
5    crate::nonblocking::quic_client::{
6        QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint,
7    },
8    lazy_static::lazy_static,
9    log::*,
10    solana_connection_cache::{
11        client_connection::{ClientConnection, ClientStats},
12        connection_cache_stats::ConnectionCacheStats,
13        nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
14    },
15    solana_transaction_error::{TransportError, TransportResult},
16    std::{
17        net::SocketAddr,
18        sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
19        time::Duration,
20    },
21    tokio::{runtime::Runtime, time::timeout},
22};
23
24pub const MAX_OUTSTANDING_TASK: u64 = 2000;
25const SEND_DATA_TIMEOUT: Duration = Duration::from_secs(10);
26
27/// A semaphore used for limiting the number of asynchronous tasks spawn to the
28/// runtime. Before spawnning a task, use acquire. After the task is done (be it
29/// success or failure), call release.
30struct AsyncTaskSemaphore {
31    /// Keep the counter info about the usage
32    counter: Mutex<u64>,
33    /// Conditional variable for signaling when counter is decremented
34    cond_var: Condvar,
35    /// The maximum usage allowed by this semaphore.
36    permits: u64,
37}
38
39impl AsyncTaskSemaphore {
40    pub fn new(permits: u64) -> Self {
41        Self {
42            counter: Mutex::new(0),
43            cond_var: Condvar::new(),
44            permits,
45        }
46    }
47
48    /// When returned, the lock has been locked and usage count has been
49    /// incremented. When the returned MutexGuard is dropped the lock is dropped
50    /// without decrementing the usage count.
51    pub fn acquire(&self) -> MutexGuard<u64> {
52        let mut count = self.counter.lock().unwrap();
53        *count += 1;
54        while *count > self.permits {
55            count = self.cond_var.wait(count).unwrap();
56        }
57        count
58    }
59
60    /// Acquire the lock and decrement the usage count
61    pub fn release(&self) {
62        let mut count = self.counter.lock().unwrap();
63        *count -= 1;
64        self.cond_var.notify_one();
65    }
66}
67
68lazy_static! {
69    static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
70        AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
71    static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
72        .thread_name("solQuicClientRt")
73        .enable_all()
74        .build()
75        .unwrap();
76}
77
78async fn send_data_async(
79    connection: Arc<NonblockingQuicConnection>,
80    buffer: Vec<u8>,
81) -> TransportResult<()> {
82    let result = timeout(SEND_DATA_TIMEOUT, connection.send_data(&buffer)).await;
83    ASYNC_TASK_SEMAPHORE.release();
84    handle_send_result(result, connection)
85}
86
87async fn send_data_batch_async(
88    connection: Arc<NonblockingQuicConnection>,
89    buffers: Vec<Vec<u8>>,
90) -> TransportResult<()> {
91    let result = timeout(
92        u32::try_from(buffers.len())
93            .map(|size| SEND_DATA_TIMEOUT.saturating_mul(size))
94            .unwrap_or(Duration::MAX),
95        connection.send_data_batch(&buffers),
96    )
97    .await;
98    ASYNC_TASK_SEMAPHORE.release();
99    handle_send_result(result, connection)
100}
101
102/// Check the send result and update stats if timedout. Returns the checked result.
103fn handle_send_result(
104    result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
105    connection: Arc<NonblockingQuicConnection>,
106) -> Result<(), TransportError> {
107    match result {
108        Ok(result) => result,
109        Err(_err) => {
110            let client_stats = ClientStats::default();
111            client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
112            let stats = connection.connection_stats();
113            stats.add_client_stats(&client_stats, 0, false);
114            info!("Timedout sending data {:?}", connection.server_addr());
115            Err(TransportError::Custom("Timedout sending data".to_string()))
116        }
117    }
118}
119
120pub struct QuicClientConnection {
121    pub inner: Arc<NonblockingQuicConnection>,
122}
123
124impl QuicClientConnection {
125    pub fn new(
126        endpoint: Arc<QuicLazyInitializedEndpoint>,
127        server_addr: SocketAddr,
128        connection_stats: Arc<ConnectionCacheStats>,
129    ) -> Self {
130        let inner = Arc::new(NonblockingQuicConnection::new(
131            endpoint,
132            server_addr,
133            connection_stats,
134        ));
135        Self { inner }
136    }
137
138    pub fn new_with_client(
139        client: Arc<QuicClient>,
140        connection_stats: Arc<ConnectionCacheStats>,
141    ) -> Self {
142        let inner = Arc::new(NonblockingQuicConnection::new_with_client(
143            client,
144            connection_stats,
145        ));
146        Self { inner }
147    }
148}
149
150impl ClientConnection for QuicClientConnection {
151    fn server_addr(&self) -> &SocketAddr {
152        self.inner.server_addr()
153    }
154
155    fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
156        RUNTIME.block_on(self.inner.send_data_batch(buffers))?;
157        Ok(())
158    }
159
160    fn send_data_async(&self, data: Vec<u8>) -> TransportResult<()> {
161        let _lock = ASYNC_TASK_SEMAPHORE.acquire();
162        let inner = self.inner.clone();
163
164        let _handle = RUNTIME.spawn(send_data_async(inner, data));
165        Ok(())
166    }
167
168    fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
169        let _lock = ASYNC_TASK_SEMAPHORE.acquire();
170        let inner = self.inner.clone();
171        let _handle = RUNTIME.spawn(send_data_batch_async(inner, buffers));
172        Ok(())
173    }
174
175    fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
176        RUNTIME.block_on(self.inner.send_data(buffer))?;
177        Ok(())
178    }
179}