solana_rpc_client/
http_sender.rs

1//! Nonblocking [`RpcSender`] over HTTP.
2
3use {
4    crate::rpc_sender::*,
5    async_trait::async_trait,
6    log::*,
7    reqwest::{
8        self,
9        header::{self, CONTENT_TYPE, RETRY_AFTER},
10        StatusCode,
11    },
12    solana_rpc_client_api::{
13        client_error::Result,
14        custom_error,
15        error_object::RpcErrorObject,
16        request::{RpcError, RpcRequest, RpcResponseErrorData},
17        response::RpcSimulateTransactionResult,
18    },
19    std::{
20        sync::{
21            atomic::{AtomicU64, Ordering},
22            Arc, RwLock,
23        },
24        time::{Duration, Instant},
25    },
26    tokio::time::sleep,
27};
28
29pub struct HttpSender {
30    client: Arc<reqwest_middleware::ClientWithMiddleware>,
31    url: String,
32    request_id: AtomicU64,
33    stats: RwLock<RpcTransportStats>,
34}
35
36/// Nonblocking [`RpcSender`] over HTTP.
37impl HttpSender {
38    /// Create an HTTP RPC sender.
39    ///
40    /// The URL is an HTTP URL, usually for port 8899, as in
41    /// "http://localhost:8899". The sender has a default timeout of 30 seconds.
42    pub fn new<U: ToString>(url: U) -> Self {
43        Self::new_with_timeout(url, Duration::from_secs(30))
44    }
45
46    /// Create an HTTP RPC sender.
47    ///
48    /// The URL is an HTTP URL, usually for port 8899.
49    pub fn new_with_timeout<U: ToString>(url: U, timeout: Duration) -> Self {
50        Self::new_with_client(
51            url,
52            reqwest::Client::builder()
53                .default_headers(Self::default_headers())
54                .timeout(timeout)
55                .pool_idle_timeout(timeout)
56                .build()
57                .expect("build rpc client"),
58        )
59    }
60
61    /// Create an HTTP RPC sender.
62    ///
63    /// Most flexible way to create a sender. Pass a created `reqwest::Client`.
64    pub fn new_with_client<U: ToString>(url: U, client: reqwest::Client) -> Self {
65        Self {
66            client: Arc::new(reqwest_middleware::ClientBuilder::new(client).build()),
67            url: url.to_string(),
68            request_id: AtomicU64::new(0),
69            stats: RwLock::new(RpcTransportStats::default()),
70        }
71    }
72
73    /// Create an HTTP RPC sender.
74    ///
75    /// Most flexible way to create a sender with middleware. Pass a created `reqwest_middleware::ClientWithMiddleware`.
76    pub fn new_with_client_with_middleware<U: ToString>(
77        url: U,
78        client: reqwest_middleware::ClientWithMiddleware,
79    ) -> Self {
80        Self {
81            client: Arc::new(client),
82            url: url.to_string(),
83            request_id: AtomicU64::new(0),
84            stats: RwLock::new(RpcTransportStats::default()),
85        }
86    }
87
88    /// Create default headers used by HTTP Sender.
89    pub fn default_headers() -> header::HeaderMap {
90        let mut default_headers = header::HeaderMap::new();
91        default_headers.append(
92            header::HeaderName::from_static("solana-client"),
93            header::HeaderValue::from_str(
94                format!("rust/{}", solana_version::Version::default()).as_str(),
95            )
96            .unwrap(),
97        );
98        default_headers
99    }
100}
101
102struct StatsUpdater<'a> {
103    stats: &'a RwLock<RpcTransportStats>,
104    request_start_time: Instant,
105    rate_limited_time: Duration,
106}
107
108impl<'a> StatsUpdater<'a> {
109    fn new(stats: &'a RwLock<RpcTransportStats>) -> Self {
110        Self {
111            stats,
112            request_start_time: Instant::now(),
113            rate_limited_time: Duration::default(),
114        }
115    }
116
117    fn add_rate_limited_time(&mut self, duration: Duration) {
118        self.rate_limited_time += duration;
119    }
120}
121
122impl Drop for StatsUpdater<'_> {
123    fn drop(&mut self) {
124        let mut stats = self.stats.write().unwrap();
125        stats.request_count += 1;
126        stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
127        stats.rate_limited_time += self.rate_limited_time;
128    }
129}
130
131#[async_trait]
132impl RpcSender for HttpSender {
133    fn get_transport_stats(&self) -> RpcTransportStats {
134        self.stats.read().unwrap().clone()
135    }
136
137    async fn send(
138        &self,
139        request: RpcRequest,
140        params: serde_json::Value,
141    ) -> Result<serde_json::Value> {
142        let mut stats_updater = StatsUpdater::new(&self.stats);
143
144        let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
145        let request_json = request.build_request_json(request_id, params).to_string();
146
147        let mut too_many_requests_retries = 5;
148        loop {
149            let response = {
150                let client = self.client.clone();
151                let request_json = request_json.clone();
152                client
153                    .post(&self.url)
154                    .header(CONTENT_TYPE, "application/json")
155                    .body(request_json)
156                    .send()
157                    .await
158            }?;
159
160            if !response.status().is_success() {
161                if response.status() == StatusCode::TOO_MANY_REQUESTS
162                    && too_many_requests_retries > 0
163                {
164                    let mut duration = Duration::from_millis(500);
165                    if let Some(retry_after) = response.headers().get(RETRY_AFTER) {
166                        if let Ok(retry_after) = retry_after.to_str() {
167                            if let Ok(retry_after) = retry_after.parse::<u64>() {
168                                if retry_after < 120 {
169                                    duration = Duration::from_secs(retry_after);
170                                }
171                            }
172                        }
173                    }
174
175                    too_many_requests_retries -= 1;
176                    debug!(
177                                "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
178                                response, too_many_requests_retries, duration
179                            );
180
181                    sleep(duration).await;
182                    stats_updater.add_rate_limited_time(duration);
183                    continue;
184                }
185                return Err(response.error_for_status().unwrap_err().into());
186            }
187
188            let mut json = response.json::<serde_json::Value>().await?;
189            if json["error"].is_object() {
190                return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) {
191                    Ok(rpc_error_object) => {
192                        let data = match rpc_error_object.code {
193                                    custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => {
194                                        match serde_json::from_value::<RpcSimulateTransactionResult>(json["error"]["data"].clone()) {
195                                            Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data),
196                                            Err(err) => {
197                                                debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err);
198                                                RpcResponseErrorData::Empty
199                                            }
200                                        }
201                                    },
202                                    custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => {
203                                        match serde_json::from_value::<custom_error::NodeUnhealthyErrorData>(json["error"]["data"].clone()) {
204                                            Ok(custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind},
205                                            Err(_err) => {
206                                                RpcResponseErrorData::Empty
207                                            }
208                                        }
209                                    },
210                                    _ => RpcResponseErrorData::Empty
211                                };
212
213                        Err(RpcError::RpcResponseError {
214                            code: rpc_error_object.code,
215                            message: rpc_error_object.message,
216                            data,
217                        }
218                        .into())
219                    }
220                    Err(err) => Err(RpcError::RpcRequestError(format!(
221                        "Failed to deserialize RPC error response: {} [{}]",
222                        serde_json::to_string(&json["error"]).unwrap(),
223                        err
224                    ))
225                    .into()),
226                };
227            }
228            return Ok(json["result"].take());
229        }
230    }
231
232    fn url(&self) -> String {
233        self.url.clone()
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[tokio::test(flavor = "multi_thread")]
242    async fn http_sender_on_tokio_multi_thread() {
243        let http_sender = HttpSender::new("http://localhost:1234".to_string());
244        let _ = http_sender
245            .send(RpcRequest::GetVersion, serde_json::Value::Null)
246            .await;
247    }
248
249    #[tokio::test(flavor = "current_thread")]
250    async fn http_sender_on_tokio_current_thread() {
251        let http_sender = HttpSender::new("http://localhost:1234".to_string());
252        let _ = http_sender
253            .send(RpcRequest::GetVersion, serde_json::Value::Null)
254            .await;
255    }
256}