sc_rpc_server/middleware/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! JSON-RPC specific middleware.
20
21use std::{
22	num::NonZeroU32,
23	time::{Duration, Instant},
24};
25
26use futures::future::{BoxFuture, FutureExt};
27use governor::{clock::Clock, Jitter};
28use jsonrpsee::{
29	server::middleware::rpc::RpcServiceT,
30	types::{ErrorObject, Id, Request},
31	MethodResponse,
32};
33
34mod metrics;
35mod node_health;
36mod rate_limit;
37
38pub use metrics::*;
39pub use node_health::*;
40pub use rate_limit::*;
41
42const MAX_JITTER: Duration = Duration::from_millis(50);
43const MAX_RETRIES: usize = 10;
44
45/// JSON-RPC middleware layer.
46#[derive(Debug, Clone, Default)]
47pub struct MiddlewareLayer {
48	rate_limit: Option<RateLimit>,
49	metrics: Option<Metrics>,
50}
51
52impl MiddlewareLayer {
53	/// Create an empty MiddlewareLayer.
54	pub fn new() -> Self {
55		Self::default()
56	}
57
58	/// Enable new rate limit middleware enforced per minute.
59	pub fn with_rate_limit_per_minute(self, n: NonZeroU32) -> Self {
60		Self { rate_limit: Some(RateLimit::per_minute(n)), metrics: self.metrics }
61	}
62
63	/// Enable metrics middleware.
64	pub fn with_metrics(self, metrics: Metrics) -> Self {
65		Self { rate_limit: self.rate_limit, metrics: Some(metrics) }
66	}
67
68	/// Register a new websocket connection.
69	pub fn ws_connect(&self) {
70		self.metrics.as_ref().map(|m| m.ws_connect());
71	}
72
73	/// Register that a websocket connection was closed.
74	pub fn ws_disconnect(&self, now: Instant) {
75		self.metrics.as_ref().map(|m| m.ws_disconnect(now));
76	}
77}
78
79impl<S> tower::Layer<S> for MiddlewareLayer {
80	type Service = Middleware<S>;
81
82	fn layer(&self, service: S) -> Self::Service {
83		Middleware { service, rate_limit: self.rate_limit.clone(), metrics: self.metrics.clone() }
84	}
85}
86
87/// JSON-RPC middleware that handles metrics
88/// and rate-limiting.
89///
90/// These are part of the same middleware
91/// because the metrics needs to know whether
92/// a call was rate-limited or not because
93/// it will impact the roundtrip for a call.
94pub struct Middleware<S> {
95	service: S,
96	rate_limit: Option<RateLimit>,
97	metrics: Option<Metrics>,
98}
99
100impl<'a, S> RpcServiceT<'a> for Middleware<S>
101where
102	S: Send + Sync + RpcServiceT<'a> + Clone + 'static,
103{
104	type Future = BoxFuture<'a, MethodResponse>;
105
106	fn call(&self, req: Request<'a>) -> Self::Future {
107		let now = Instant::now();
108
109		self.metrics.as_ref().map(|m| m.on_call(&req));
110
111		let service = self.service.clone();
112		let rate_limit = self.rate_limit.clone();
113		let metrics = self.metrics.clone();
114
115		async move {
116			let mut is_rate_limited = false;
117
118			if let Some(limit) = rate_limit.as_ref() {
119				let mut attempts = 0;
120				let jitter = Jitter::up_to(MAX_JITTER);
121
122				loop {
123					if attempts >= MAX_RETRIES {
124						return reject_too_many_calls(req.id);
125					}
126
127					if let Err(rejected) = limit.inner.check() {
128						tokio::time::sleep(jitter + rejected.wait_time_from(limit.clock.now()))
129							.await;
130					} else {
131						break;
132					}
133
134					is_rate_limited = true;
135					attempts += 1;
136				}
137			}
138
139			let rp = service.call(req.clone()).await;
140			metrics.as_ref().map(|m| m.on_response(&req, &rp, is_rate_limited, now));
141
142			rp
143		}
144		.boxed()
145	}
146}
147
148fn reject_too_many_calls(id: Id) -> MethodResponse {
149	MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>))
150}