jsonrpsee_server/
utils.rs1use std::future::Future;
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use crate::{HttpBody, HttpRequest};
32
33use futures_util::future::{self, Either};
34use hyper_util::rt::{TokioExecutor, TokioIo};
35use jsonrpsee_core::BoxError;
36use pin_project::pin_project;
37use tower::util::Oneshot;
38use tower::ServiceExt;
39
40#[derive(Debug, Copy, Clone)]
41pub(crate) struct TowerToHyperService<S> {
42 service: S,
43}
44
45impl<S> TowerToHyperService<S> {
46 pub(crate) fn new(service: S) -> Self {
47 Self { service }
48 }
49}
50
51impl<S> hyper::service::Service<HttpRequest<hyper::body::Incoming>> for TowerToHyperService<S>
52where
53 S: tower::Service<HttpRequest> + Clone,
54{
55 type Response = S::Response;
56 type Error = S::Error;
57 type Future = TowerToHyperServiceFuture<S, HttpRequest>;
58
59 fn call(&self, req: HttpRequest<hyper::body::Incoming>) -> Self::Future {
60 let req = req.map(HttpBody::new);
61 TowerToHyperServiceFuture { future: self.service.clone().oneshot(req) }
62 }
63}
64
65#[pin_project]
66pub(crate) struct TowerToHyperServiceFuture<S, R>
67where
68 S: tower::Service<R>,
69{
70 #[pin]
71 future: Oneshot<S, R>,
72}
73
74impl<S, R> std::future::Future for TowerToHyperServiceFuture<S, R>
75where
76 S: tower::Service<R>,
77{
78 type Output = Result<S::Response, S::Error>;
79
80 #[inline]
81 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82 self.project().future.poll(cx)
83 }
84}
85
86pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
91where
92 S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
93 S::Future: Send,
94 S::Response: Send,
95 S::Error: Into<BoxError>,
96 B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
97 B::Error: Into<BoxError>,
98 I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
99{
100 let service = hyper_util::service::TowerToHyperService::new(service);
101 let io = TokioIo::new(io);
102
103 let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
104 let conn = builder.serve_connection_with_upgrades(io, service);
105 conn.await
106}
107
108pub async fn serve_with_graceful_shutdown<S, B, I>(
111 io: I,
112 service: S,
113 stopped: impl Future<Output = ()>,
114) -> Result<(), BoxError>
115where
116 S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
117 S::Future: Send,
118 S::Response: Send,
119 S::Error: Into<BoxError>,
120 B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
121 B::Error: Into<BoxError>,
122 I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
123{
124 let service = hyper_util::service::TowerToHyperService::new(service);
125 let io = TokioIo::new(io);
126
127 let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
128 let conn = builder.serve_connection_with_upgrades(io, service);
129
130 tokio::pin!(stopped, conn);
131
132 match future::select(conn, stopped).await {
133 Either::Left((conn, _)) => conn,
135 Either::Right((_, mut conn)) => {
138 conn.as_mut().graceful_shutdown();
139 conn.await
140 }
141 }
142}
143
144pub(crate) mod deserialize {
146 pub(crate) fn from_slice_with_extensions(
148 data: &[u8],
149 extensions: http::Extensions,
150 ) -> Result<jsonrpsee_types::Request, serde_json::Error> {
151 let mut req: jsonrpsee_types::Request = serde_json::from_slice(data)?;
152 *req.extensions_mut() = extensions;
153 Ok(req)
154 }
155
156 pub(crate) fn from_str_with_extensions(
158 data: &str,
159 extensions: http::Extensions,
160 ) -> Result<jsonrpsee_types::Request, serde_json::Error> {
161 let mut req: jsonrpsee_types::Request = serde_json::from_str(data)?;
162 *req.extensions_mut() = extensions;
163 Ok(req)
164 }
165}