solana_rpc_client/
http_sender.rs1use {
4 crate::rpc_sender::*,
5 async_trait::async_trait,
6 log::*,
7 reqwest::{
8 self,
9 header::{self, CONTENT_TYPE, RETRY_AFTER},
10 StatusCode,
11 },
12 solana_rpc_client_api::{
13 client_error::Result,
14 custom_error,
15 error_object::RpcErrorObject,
16 request::{RpcError, RpcRequest, RpcResponseErrorData},
17 response::RpcSimulateTransactionResult,
18 },
19 std::{
20 sync::{
21 atomic::{AtomicU64, Ordering},
22 Arc, RwLock,
23 },
24 time::{Duration, Instant},
25 },
26 tokio::time::sleep,
27};
28
29pub struct HttpSender {
30 client: Arc<reqwest_middleware::ClientWithMiddleware>,
31 url: String,
32 request_id: AtomicU64,
33 stats: RwLock<RpcTransportStats>,
34}
35
36impl HttpSender {
38 pub fn new<U: ToString>(url: U) -> Self {
43 Self::new_with_timeout(url, Duration::from_secs(30))
44 }
45
46 pub fn new_with_timeout<U: ToString>(url: U, timeout: Duration) -> Self {
50 Self::new_with_client(
51 url,
52 reqwest::Client::builder()
53 .default_headers(Self::default_headers())
54 .timeout(timeout)
55 .pool_idle_timeout(timeout)
56 .build()
57 .expect("build rpc client"),
58 )
59 }
60
61 pub fn new_with_client<U: ToString>(url: U, client: reqwest::Client) -> Self {
65 Self {
66 client: Arc::new(reqwest_middleware::ClientBuilder::new(client).build()),
67 url: url.to_string(),
68 request_id: AtomicU64::new(0),
69 stats: RwLock::new(RpcTransportStats::default()),
70 }
71 }
72
73 pub fn new_with_client_with_middleware<U: ToString>(
77 url: U,
78 client: reqwest_middleware::ClientWithMiddleware,
79 ) -> Self {
80 Self {
81 client: Arc::new(client),
82 url: url.to_string(),
83 request_id: AtomicU64::new(0),
84 stats: RwLock::new(RpcTransportStats::default()),
85 }
86 }
87
88 pub fn default_headers() -> header::HeaderMap {
90 let mut default_headers = header::HeaderMap::new();
91 default_headers.append(
92 header::HeaderName::from_static("solana-client"),
93 header::HeaderValue::from_str(
94 format!("rust/{}", solana_version::Version::default()).as_str(),
95 )
96 .unwrap(),
97 );
98 default_headers
99 }
100}
101
102struct StatsUpdater<'a> {
103 stats: &'a RwLock<RpcTransportStats>,
104 request_start_time: Instant,
105 rate_limited_time: Duration,
106}
107
108impl<'a> StatsUpdater<'a> {
109 fn new(stats: &'a RwLock<RpcTransportStats>) -> Self {
110 Self {
111 stats,
112 request_start_time: Instant::now(),
113 rate_limited_time: Duration::default(),
114 }
115 }
116
117 fn add_rate_limited_time(&mut self, duration: Duration) {
118 self.rate_limited_time += duration;
119 }
120}
121
122impl Drop for StatsUpdater<'_> {
123 fn drop(&mut self) {
124 let mut stats = self.stats.write().unwrap();
125 stats.request_count += 1;
126 stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
127 stats.rate_limited_time += self.rate_limited_time;
128 }
129}
130
131#[async_trait]
132impl RpcSender for HttpSender {
133 fn get_transport_stats(&self) -> RpcTransportStats {
134 self.stats.read().unwrap().clone()
135 }
136
137 async fn send(
138 &self,
139 request: RpcRequest,
140 params: serde_json::Value,
141 ) -> Result<serde_json::Value> {
142 let mut stats_updater = StatsUpdater::new(&self.stats);
143
144 let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
145 let request_json = request.build_request_json(request_id, params).to_string();
146
147 let mut too_many_requests_retries = 5;
148 loop {
149 let response = {
150 let client = self.client.clone();
151 let request_json = request_json.clone();
152 client
153 .post(&self.url)
154 .header(CONTENT_TYPE, "application/json")
155 .body(request_json)
156 .send()
157 .await
158 }?;
159
160 if !response.status().is_success() {
161 if response.status() == StatusCode::TOO_MANY_REQUESTS
162 && too_many_requests_retries > 0
163 {
164 let mut duration = Duration::from_millis(500);
165 if let Some(retry_after) = response.headers().get(RETRY_AFTER) {
166 if let Ok(retry_after) = retry_after.to_str() {
167 if let Ok(retry_after) = retry_after.parse::<u64>() {
168 if retry_after < 120 {
169 duration = Duration::from_secs(retry_after);
170 }
171 }
172 }
173 }
174
175 too_many_requests_retries -= 1;
176 debug!(
177 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
178 response, too_many_requests_retries, duration
179 );
180
181 sleep(duration).await;
182 stats_updater.add_rate_limited_time(duration);
183 continue;
184 }
185 return Err(response.error_for_status().unwrap_err().into());
186 }
187
188 let mut json = response.json::<serde_json::Value>().await?;
189 if json["error"].is_object() {
190 return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) {
191 Ok(rpc_error_object) => {
192 let data = match rpc_error_object.code {
193 custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => {
194 match serde_json::from_value::<RpcSimulateTransactionResult>(json["error"]["data"].clone()) {
195 Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data),
196 Err(err) => {
197 debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err);
198 RpcResponseErrorData::Empty
199 }
200 }
201 },
202 custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => {
203 match serde_json::from_value::<custom_error::NodeUnhealthyErrorData>(json["error"]["data"].clone()) {
204 Ok(custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind},
205 Err(_err) => {
206 RpcResponseErrorData::Empty
207 }
208 }
209 },
210 _ => RpcResponseErrorData::Empty
211 };
212
213 Err(RpcError::RpcResponseError {
214 code: rpc_error_object.code,
215 message: rpc_error_object.message,
216 data,
217 }
218 .into())
219 }
220 Err(err) => Err(RpcError::RpcRequestError(format!(
221 "Failed to deserialize RPC error response: {} [{}]",
222 serde_json::to_string(&json["error"]).unwrap(),
223 err
224 ))
225 .into()),
226 };
227 }
228 return Ok(json["result"].take());
229 }
230 }
231
232 fn url(&self) -> String {
233 self.url.clone()
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[tokio::test(flavor = "multi_thread")]
242 async fn http_sender_on_tokio_multi_thread() {
243 let http_sender = HttpSender::new("http://localhost:1234".to_string());
244 let _ = http_sender
245 .send(RpcRequest::GetVersion, serde_json::Value::Null)
246 .await;
247 }
248
249 #[tokio::test(flavor = "current_thread")]
250 async fn http_sender_on_tokio_current_thread() {
251 let http_sender = HttpSender::new("http://localhost:1234".to_string());
252 let _ = http_sender
253 .send(RpcRequest::GetVersion, serde_json::Value::Null)
254 .await;
255 }
256}