#![warn(missing_docs)]
pub mod middleware;
pub mod utils;
use std::{
convert::Infallible, error::Error as StdError, net::SocketAddr, num::NonZeroU32, time::Duration,
};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
};
use jsonrpsee::{
server::{
middleware::http::ProxyGetRequestLayer, stop_channel, ws, PingConfig, StopHandle,
TowerServiceBuilder,
},
Methods, RpcModule,
};
use tokio::net::TcpListener;
use tower::Service;
use utils::{build_rpc_api, format_cors, get_proxy_ip, host_filtering, try_into_cors};
pub use ip_network::IpNetwork;
pub use jsonrpsee::{
core::{
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
traits::IdProvider,
},
server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
};
pub use middleware::{Metrics, MiddlewareLayer, RpcMetrics};
const MEGABYTE: u32 = 1024 * 1024;
pub type Server = jsonrpsee::server::ServerHandle;
#[derive(Debug)]
pub struct Config<'a, M: Send + Sync + 'static> {
pub addrs: [SocketAddr; 2],
pub cors: Option<&'a Vec<String>>,
pub max_connections: u32,
pub max_subs_per_conn: u32,
pub max_payload_in_mb: u32,
pub max_payload_out_mb: u32,
pub metrics: Option<RpcMetrics>,
pub message_buffer_capacity: u32,
pub rpc_api: RpcModule<M>,
pub id_provider: Option<Box<dyn IdProvider>>,
pub tokio_handle: tokio::runtime::Handle,
pub batch_config: BatchRequestConfig,
pub rate_limit: Option<NonZeroU32>,
pub rate_limit_whitelisted_ips: Vec<IpNetwork>,
pub rate_limit_trust_proxy_headers: bool,
}
#[derive(Debug, Clone)]
struct PerConnection<RpcMiddleware, HttpMiddleware> {
methods: Methods,
stop_handle: StopHandle,
metrics: Option<RpcMetrics>,
tokio_handle: tokio::runtime::Handle,
service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
}
pub async fn start_server<M>(
config: Config<'_, M>,
) -> Result<Server, Box<dyn StdError + Send + Sync>>
where
M: Send + Sync,
{
let Config {
addrs,
batch_config,
cors,
max_payload_in_mb,
max_payload_out_mb,
max_connections,
max_subs_per_conn,
metrics,
message_buffer_capacity,
id_provider,
tokio_handle,
rpc_api,
rate_limit,
rate_limit_whitelisted_ips,
rate_limit_trust_proxy_headers,
} = config;
let std_listener = TcpListener::bind(addrs.as_slice()).await?.into_std()?;
let local_addr = std_listener.local_addr().ok();
let host_filter = host_filtering(cors.is_some(), local_addr);
let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.layer(try_into_cors(cors)?);
let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subs_per_conn)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(message_buffer_capacity)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(tokio_handle.clone());
if let Some(provider) = id_provider {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};
let (stop_handle, server_handle) = stop_channel();
let cfg = PerConnection {
methods: build_rpc_api(rpc_api).into(),
service_builder: builder.to_service_builder(),
metrics,
tokio_handle,
stop_handle: stop_handle.clone(),
};
let make_service = make_service_fn(move |addr: &AddrStream| {
let cfg = cfg.clone();
let rate_limit_whitelisted_ips = rate_limit_whitelisted_ips.clone();
let ip = addr.remote_addr().ip();
async move {
let cfg = cfg.clone();
let rate_limit_whitelisted_ips = rate_limit_whitelisted_ips.clone();
Ok::<_, Infallible>(service_fn(move |req| {
let proxy_ip =
if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
let rate_limit_cfg = if rate_limit_whitelisted_ips
.iter()
.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
{
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
None
} else {
if !rate_limit_whitelisted_ips.is_empty() {
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
}
rate_limit
};
let PerConnection { service_builder, metrics, tokio_handle, stop_handle, methods } =
cfg.clone();
let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };
let middleware_layer = match (metrics, rate_limit_cfg) {
(None, None) => None,
(Some(metrics), None) => Some(
MiddlewareLayer::new().with_metrics(Metrics::new(metrics, transport_label)),
),
(None, Some(rate_limit)) =>
Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
(Some(metrics), Some(rate_limit)) => Some(
MiddlewareLayer::new()
.with_metrics(Metrics::new(metrics, transport_label))
.with_rate_limit_per_minute(rate_limit),
),
};
let rpc_middleware =
RpcServiceBuilder::new().option_layer(middleware_layer.clone());
let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);
async move {
if is_websocket {
let on_disconnect = svc.on_session_closed();
tokio_handle.spawn(async move {
let now = std::time::Instant::now();
middleware_layer.as_ref().map(|m| m.ws_connect());
on_disconnect.await;
middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
});
}
svc.call(req).await
}
}))
}
});
let server = hyper::Server::from_tcp(std_listener)?.serve(make_service);
tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async move { stop_handle.shutdown().await });
let _ = graceful.await;
});
log::info!(
"Running JSON-RPC server: addr={}, allowed origins={}",
local_addr.map_or_else(|| "unknown".to_string(), |a| a.to_string()),
format_cors(cors)
);
Ok(server_handle)
}