sc_rpc_server/middleware/
node_health.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Middleware for handling `/health` and `/health/readiness` endpoints.
20
21use std::{
22	error::Error,
23	future::Future,
24	pin::Pin,
25	task::{Context, Poll},
26};
27
28use futures::future::FutureExt;
29use http::{HeaderValue, Method, StatusCode, Uri};
30use jsonrpsee::{
31	server::{HttpBody, HttpRequest, HttpResponse},
32	types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess},
33};
34use tower::Service;
35
36const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#;
37const HEADER_VALUE_JSON: HeaderValue = HeaderValue::from_static("application/json; charset=utf-8");
38
39/// Layer that applies [`NodeHealthProxy`] which
40/// proxies `/health` and `/health/readiness` endpoints.
41#[derive(Debug, Clone, Default)]
42pub struct NodeHealthProxyLayer;
43
44impl<S> tower::Layer<S> for NodeHealthProxyLayer {
45	type Service = NodeHealthProxy<S>;
46
47	fn layer(&self, service: S) -> Self::Service {
48		NodeHealthProxy::new(service)
49	}
50}
51
52/// Middleware that proxies `/health` and `/health/readiness` endpoints.
53pub struct NodeHealthProxy<S>(S);
54
55impl<S> NodeHealthProxy<S> {
56	/// Creates a new [`NodeHealthProxy`].
57	pub fn new(service: S) -> Self {
58		Self(service)
59	}
60}
61
62impl<S> tower::Service<http::Request<hyper::body::Incoming>> for NodeHealthProxy<S>
63where
64	S: Service<HttpRequest, Response = HttpResponse>,
65	S::Response: 'static,
66	S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
67	S::Future: Send + 'static,
68{
69	type Response = S::Response;
70	type Error = Box<dyn Error + Send + Sync + 'static>;
71	type Future =
72		Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
73
74	fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75		self.0.poll_ready(cx).map_err(Into::into)
76	}
77
78	fn call(&mut self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
79		let mut req = req.map(|body| HttpBody::new(body));
80		let maybe_intercept = InterceptRequest::from_http(&req);
81
82		// Modify the request and proxy it to `system_health`
83		if let InterceptRequest::Health | InterceptRequest::Readiness = maybe_intercept {
84			// RPC methods are accessed with `POST`.
85			*req.method_mut() = Method::POST;
86			// Precautionary remove the URI.
87			*req.uri_mut() = Uri::from_static("/");
88
89			// Requests must have the following headers:
90			req.headers_mut().insert(http::header::CONTENT_TYPE, HEADER_VALUE_JSON);
91			req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON);
92
93			// Adjust the body to reflect the method call.
94			req = req.map(|_| HttpBody::from(RPC_SYSTEM_HEALTH_CALL));
95		}
96
97		// Call the inner service and get a future that resolves to the response.
98		let fut = self.0.call(req);
99
100		async move {
101			Ok(match maybe_intercept {
102				InterceptRequest::Deny =>
103					http_response(StatusCode::METHOD_NOT_ALLOWED, HttpBody::empty()),
104				InterceptRequest::No => fut.await.map_err(|err| err.into())?,
105				InterceptRequest::Health => {
106					let res = fut.await.map_err(|err| err.into())?;
107					let health = parse_rpc_response(res.into_body()).await?;
108					http_ok_response(serde_json::to_string(&health)?)
109				},
110				InterceptRequest::Readiness => {
111					let res = fut.await.map_err(|err| err.into())?;
112					let health = parse_rpc_response(res.into_body()).await?;
113					if (!health.is_syncing && health.peers > 0) || !health.should_have_peers {
114						http_ok_response(HttpBody::empty())
115					} else {
116						http_internal_error()
117					}
118				},
119			})
120		}
121		.boxed()
122	}
123}
124
125// NOTE: This is duplicated here to avoid dependency to the `RPC API`.
126#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
127#[serde(rename_all = "camelCase")]
128struct Health {
129	/// Number of connected peers
130	pub peers: usize,
131	/// Is the node syncing
132	pub is_syncing: bool,
133	/// Should this node have any peers
134	///
135	/// Might be false for local chains or when running without discovery.
136	pub should_have_peers: bool,
137}
138
139fn http_ok_response<S: Into<HttpBody>>(body: S) -> HttpResponse {
140	http_response(StatusCode::OK, body)
141}
142
143fn http_response<S: Into<HttpBody>>(status_code: StatusCode, body: S) -> HttpResponse {
144	HttpResponse::builder()
145		.status(status_code)
146		.header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON)
147		.body(body.into())
148		.expect("Header is valid; qed")
149}
150
151fn http_internal_error() -> HttpResponse {
152	http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, HttpBody::empty())
153}
154
155async fn parse_rpc_response(
156	body: HttpBody,
157) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
158	use http_body_util::BodyExt;
159
160	let bytes = body.collect().await?.to_bytes();
161
162	let raw_rp = serde_json::from_slice::<RpcResponse<Health>>(&bytes)?;
163	let rp = RpcResponseSuccess::<Health>::try_from(raw_rp)?;
164
165	Ok(rp.result)
166}
167
168/// Whether the request should be treated as ordinary RPC call or be modified.
169enum InterceptRequest {
170	/// Proxy `/health` to `system_health`.
171	Health,
172	/// Checks if node has at least one peer and is not doing major syncing.
173	///
174	/// Returns HTTP status code 200 on success otherwise HTTP status code 500 is returned.
175	Readiness,
176	/// Treat as a ordinary RPC call and don't modify the request or response.
177	No,
178	/// Deny health or readiness calls that is not HTTP GET request.
179	///
180	/// Returns HTTP status code 405.
181	Deny,
182}
183
184impl InterceptRequest {
185	fn from_http(req: &HttpRequest) -> InterceptRequest {
186		match req.uri().path() {
187			"/health" =>
188				if req.method() == http::Method::GET {
189					InterceptRequest::Health
190				} else {
191					InterceptRequest::Deny
192				},
193			"/health/readiness" =>
194				if req.method() == http::Method::GET {
195					InterceptRequest::Readiness
196				} else {
197					InterceptRequest::Deny
198				},
199			// Forward all other requests to the RPC server.
200			_ => InterceptRequest::No,
201		}
202	}
203}