jsonrpsee_server/
utils.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::future::Future;
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use crate::{HttpBody, HttpRequest};
32
33use futures_util::future::{self, Either};
34use hyper_util::rt::{TokioExecutor, TokioIo};
35use jsonrpsee_core::BoxError;
36use pin_project::pin_project;
37use tower::util::Oneshot;
38use tower::ServiceExt;
39
40#[derive(Debug, Copy, Clone)]
41pub(crate) struct TowerToHyperService<S> {
42	service: S,
43}
44
45impl<S> TowerToHyperService<S> {
46	pub(crate) fn new(service: S) -> Self {
47		Self { service }
48	}
49}
50
51impl<S> hyper::service::Service<HttpRequest<hyper::body::Incoming>> for TowerToHyperService<S>
52where
53	S: tower::Service<HttpRequest> + Clone,
54{
55	type Response = S::Response;
56	type Error = S::Error;
57	type Future = TowerToHyperServiceFuture<S, HttpRequest>;
58
59	fn call(&self, req: HttpRequest<hyper::body::Incoming>) -> Self::Future {
60		let req = req.map(HttpBody::new);
61		TowerToHyperServiceFuture { future: self.service.clone().oneshot(req) }
62	}
63}
64
65#[pin_project]
66pub(crate) struct TowerToHyperServiceFuture<S, R>
67where
68	S: tower::Service<R>,
69{
70	#[pin]
71	future: Oneshot<S, R>,
72}
73
74impl<S, R> std::future::Future for TowerToHyperServiceFuture<S, R>
75where
76	S: tower::Service<R>,
77{
78	type Output = Result<S::Response, S::Error>;
79
80	#[inline]
81	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82		self.project().future.poll(cx)
83	}
84}
85
86/// Serve a service over a TCP connection without graceful shutdown.
87/// This means that pending requests will be dropped when the server is stopped.
88///
89/// If you want to gracefully shutdown the server, use [`serve_with_graceful_shutdown`] instead.
90pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
91where
92	S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
93	S::Future: Send,
94	S::Response: Send,
95	S::Error: Into<BoxError>,
96	B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
97	B::Error: Into<BoxError>,
98	I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
99{
100	let service = hyper_util::service::TowerToHyperService::new(service);
101	let io = TokioIo::new(io);
102
103	let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
104	let conn = builder.serve_connection_with_upgrades(io, service);
105	conn.await
106}
107
108/// Serve a service over a TCP connection with graceful shutdown.
109/// This means that pending requests will be completed before the server is stopped.
110pub async fn serve_with_graceful_shutdown<S, B, I>(
111	io: I,
112	service: S,
113	stopped: impl Future<Output = ()>,
114) -> Result<(), BoxError>
115where
116	S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
117	S::Future: Send,
118	S::Response: Send,
119	S::Error: Into<BoxError>,
120	B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
121	B::Error: Into<BoxError>,
122	I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
123{
124	let service = hyper_util::service::TowerToHyperService::new(service);
125	let io = TokioIo::new(io);
126
127	let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
128	let conn = builder.serve_connection_with_upgrades(io, service);
129
130	tokio::pin!(stopped, conn);
131
132	match future::select(conn, stopped).await {
133		// Return if the connection was completed.
134		Either::Left((conn, _)) => conn,
135		// If the server is stopped, we should gracefully shutdown
136		// the connection and poll it until it finishes.
137		Either::Right((_, mut conn)) => {
138			conn.as_mut().graceful_shutdown();
139			conn.await
140		}
141	}
142}
143
144/// Helpers to deserialize a request with extensions.
145pub(crate) mod deserialize {
146	/// Helper to deserialize a request with extensions.
147	pub(crate) fn from_slice_with_extensions(
148		data: &[u8],
149		extensions: http::Extensions,
150	) -> Result<jsonrpsee_types::Request, serde_json::Error> {
151		let mut req: jsonrpsee_types::Request = serde_json::from_slice(data)?;
152		*req.extensions_mut() = extensions;
153		Ok(req)
154	}
155
156	/// Helper to deserialize a request with extensions.
157	pub(crate) fn from_str_with_extensions(
158		data: &str,
159		extensions: http::Extensions,
160	) -> Result<jsonrpsee_types::Request, serde_json::Error> {
161		let mut req: jsonrpsee_types::Request = serde_json::from_str(data)?;
162		*req.extensions_mut() = extensions;
163		Ok(req)
164	}
165}