safecoin_client/
quic_client.rs

1//! Simple client that connects to a given UDP port with the QUIC protocol and provides
2//! an interface for sending transactions which is restricted by the server's flow control.
3
4use {
5    crate::{
6        connection_cache::ConnectionCacheStats,
7        nonblocking::{
8            quic_client::{
9                QuicClient, QuicLazyInitializedEndpoint,
10                QuicTpuConnection as NonblockingQuicTpuConnection,
11            },
12            tpu_connection::TpuConnection as NonblockingTpuConnection,
13        },
14        tpu_connection::{ClientStats, TpuConnection},
15    },
16    lazy_static::lazy_static,
17    log::*,
18    solana_sdk::transport::{Result as TransportResult, TransportError},
19    std::{
20        net::SocketAddr,
21        sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
22        time::Duration,
23    },
24    tokio::{runtime::Runtime, time::timeout},
25};
26
27const MAX_OUTSTANDING_TASK: u64 = 2000;
28const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;
29
30/// A semaphore used for limiting the number of asynchronous tasks spawn to the
31/// runtime. Before spawnning a task, use acquire. After the task is done (be it
32/// succsess or failure), call release.
33struct AsyncTaskSemaphore {
34    /// Keep the counter info about the usage
35    counter: Mutex<u64>,
36    /// Conditional variable for signaling when counter is decremented
37    cond_var: Condvar,
38    /// The maximum usage allowed by this semaphore.
39    permits: u64,
40}
41
42impl AsyncTaskSemaphore {
43    fn new(permits: u64) -> Self {
44        Self {
45            counter: Mutex::new(0),
46            cond_var: Condvar::new(),
47            permits,
48        }
49    }
50
51    /// When returned, the lock has been locked and usage count has been
52    /// incremented. When the returned MutexGuard is dropped the lock is dropped
53    /// without decrementing the usage count.
54    fn acquire(&self) -> MutexGuard<u64> {
55        let mut count = self.counter.lock().unwrap();
56        *count += 1;
57        while *count > self.permits {
58            count = self.cond_var.wait(count).unwrap();
59        }
60        count
61    }
62
63    /// Acquire the lock and decrement the usage count
64    fn release(&self) {
65        let mut count = self.counter.lock().unwrap();
66        *count -= 1;
67        self.cond_var.notify_one();
68    }
69}
70
71lazy_static! {
72    static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
73        AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
74    static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
75        .enable_all()
76        .build()
77        .unwrap();
78}
79
80pub struct QuicTpuConnection {
81    inner: Arc<NonblockingQuicTpuConnection>,
82}
83impl QuicTpuConnection {
84    pub fn new(
85        endpoint: Arc<QuicLazyInitializedEndpoint>,
86        tpu_addr: SocketAddr,
87        connection_stats: Arc<ConnectionCacheStats>,
88    ) -> Self {
89        let inner = Arc::new(NonblockingQuicTpuConnection::new(
90            endpoint,
91            tpu_addr,
92            connection_stats,
93        ));
94        Self { inner }
95    }
96
97    pub fn new_with_client(
98        client: Arc<QuicClient>,
99        connection_stats: Arc<ConnectionCacheStats>,
100    ) -> Self {
101        let inner = Arc::new(NonblockingQuicTpuConnection::new_with_client(
102            client,
103            connection_stats,
104        ));
105        Self { inner }
106    }
107}
108
109async fn send_wire_transaction_async(
110    connection: Arc<NonblockingQuicTpuConnection>,
111    wire_transaction: Vec<u8>,
112) -> TransportResult<()> {
113    let result = timeout(
114        Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
115        connection.send_wire_transaction(wire_transaction),
116    )
117    .await;
118    ASYNC_TASK_SEMAPHORE.release();
119    handle_send_result(result, connection)
120}
121
122async fn send_wire_transaction_batch_async(
123    connection: Arc<NonblockingQuicTpuConnection>,
124    buffers: Vec<Vec<u8>>,
125) -> TransportResult<()> {
126    let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64;
127
128    let result = timeout(
129        Duration::from_millis(time_out),
130        connection.send_wire_transaction_batch(&buffers),
131    )
132    .await;
133    ASYNC_TASK_SEMAPHORE.release();
134    handle_send_result(result, connection)
135}
136
137/// Check the send result and update stats if timedout. Returns the checked result.
138fn handle_send_result(
139    result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
140    connection: Arc<NonblockingQuicTpuConnection>,
141) -> Result<(), TransportError> {
142    match result {
143        Ok(result) => result,
144        Err(_err) => {
145            let client_stats = ClientStats::default();
146            client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
147            let stats = connection.connection_stats();
148            stats.add_client_stats(&client_stats, 0, false);
149            info!("Timedout sending transaction {:?}", connection.tpu_addr());
150            Err(TransportError::Custom(
151                "Timedout sending transaction".to_string(),
152            ))
153        }
154    }
155}
156
157impl TpuConnection for QuicTpuConnection {
158    fn tpu_addr(&self) -> &SocketAddr {
159        self.inner.tpu_addr()
160    }
161
162    fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
163    where
164        T: AsRef<[u8]> + Send + Sync,
165    {
166        RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?;
167        Ok(())
168    }
169
170    fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
171        let _lock = ASYNC_TASK_SEMAPHORE.acquire();
172        let inner = self.inner.clone();
173
174        let _ = RUNTIME
175            .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await });
176        Ok(())
177    }
178
179    fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
180        let _lock = ASYNC_TASK_SEMAPHORE.acquire();
181        let inner = self.inner.clone();
182        let _ =
183            RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await });
184        Ok(())
185    }
186}