jsonrpsee_core/server/
rpc_module.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::collections::hash_map::Entry;
28use std::fmt::{self, Debug};
29use std::future::Future;
30use std::ops::{Deref, DerefMut};
31use std::sync::Arc;
32
33use crate::error::RegisterMethodError;
34use crate::id_providers::RandomIntegerIdProvider;
35use crate::server::helpers::MethodSink;
36use crate::server::method_response::MethodResponse;
37use crate::server::subscription::{
38	sub_message_to_json, BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink,
39	SubNotifResultOrError, Subscribers, Subscription, SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit,
40	SubscriptionState,
41};
42use crate::server::{ResponsePayload, LOG_TARGET};
43use crate::traits::ToRpcParams;
44use futures_util::{future::BoxFuture, FutureExt};
45use http::Extensions;
46use jsonrpsee_types::error::{ErrorCode, ErrorObject};
47use jsonrpsee_types::{
48	ErrorObjectOwned, Id, Params, Request, Response, ResponseSuccess, SubscriptionId as RpcSubscriptionId,
49};
50use rustc_hash::FxHashMap;
51use serde::de::DeserializeOwned;
52use tokio::sync::{mpsc, oneshot};
53
54use super::IntoResponse;
55
56/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
57/// implemented as a function pointer to a `Fn` function taking four arguments:
58/// the `id`, `params`, a channel the function uses to communicate the result (or error)
59/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
60pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize, Extensions) -> MethodResponse>;
61/// Similar to [`SyncMethod`], but represents an asynchronous handler.
62pub type AsyncMethod<'a> = Arc<
63	dyn Send
64		+ Sync
65		+ Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize, Extensions) -> BoxFuture<'a, MethodResponse>,
66>;
67
68/// Method callback for subscriptions.
69pub type SubscriptionMethod<'a> =
70	Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, SubscriptionState, Extensions) -> BoxFuture<'a, MethodResponse>>;
71// Method callback to unsubscribe.
72type UnsubscriptionMethod =
73	Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize, Extensions) -> MethodResponse>;
74
75/// Connection ID.
76#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default, serde::Deserialize, serde::Serialize)]
77pub struct ConnectionId(pub usize);
78
79impl From<u32> for ConnectionId {
80	fn from(id: u32) -> Self {
81		Self(id as usize)
82	}
83}
84
85impl From<usize> for ConnectionId {
86	fn from(id: usize) -> Self {
87		Self(id)
88	}
89}
90
91/// Max response size.
92pub type MaxResponseSize = usize;
93
94/// Raw response from an RPC
95/// A tuple containing:
96///   - Call result as a `String`,
97///   - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
98pub type RawRpcResponse = (String, mpsc::Receiver<String>);
99
100/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
101#[derive(thiserror::Error, Debug)]
102pub enum MethodsError {
103	/// Failed to parse the call as valid JSON-RPC.
104	#[error(transparent)]
105	Parse(#[from] serde_json::Error),
106	/// Specific JSON-RPC error.
107	#[error(transparent)]
108	JsonRpc(#[from] ErrorObjectOwned),
109	#[error("Invalid subscription ID: `{0}`")]
110	/// Invalid subscription ID.
111	InvalidSubscriptionId(String),
112}
113
114/// This represent a response to a RPC call
115/// and `Subscribe` calls are handled differently
116/// because we want to prevent subscriptions to start
117/// before the actual subscription call has been answered.
118#[derive(Debug)]
119pub enum CallOrSubscription {
120	/// The subscription callback itself sends back the result
121	/// so it must not be sent back again.
122	Subscription(MethodResponse),
123	/// Treat it as ordinary call.
124	Call(MethodResponse),
125}
126
127impl CallOrSubscription {
128	/// Extract the JSON-RPC response.
129	pub fn as_response(&self) -> &MethodResponse {
130		match &self {
131			Self::Subscription(r) => r,
132			Self::Call(r) => r,
133		}
134	}
135
136	/// Extract the JSON-RPC response.
137	pub fn into_response(self) -> MethodResponse {
138		match self {
139			Self::Subscription(r) => r,
140			Self::Call(r) => r,
141		}
142	}
143}
144
145/// Callback wrapper that can be either sync or async.
146#[derive(Clone)]
147pub enum MethodCallback {
148	/// Synchronous method handler.
149	Sync(SyncMethod),
150	/// Asynchronous method handler.
151	Async(AsyncMethod<'static>),
152	/// Subscription method handler.
153	Subscription(SubscriptionMethod<'static>),
154	/// Unsubscription method handler.
155	Unsubscription(UnsubscriptionMethod),
156}
157
158/// The kind of the JSON-RPC method call, it can be a subscription, method call or unknown.
159#[derive(Debug, Copy, Clone)]
160pub enum MethodKind {
161	/// Subscription Call.
162	Subscription,
163	/// Unsubscription Call.
164	Unsubscription,
165	/// Method call.
166	MethodCall,
167	/// The method was not found.
168	NotFound,
169}
170
171impl std::fmt::Display for MethodKind {
172	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173		let s = match self {
174			Self::Subscription => "subscription",
175			Self::MethodCall => "method call",
176			Self::NotFound => "method not found",
177			Self::Unsubscription => "unsubscription",
178		};
179
180		write!(f, "{s}")
181	}
182}
183
184/// Result of a method, either direct value or a future of one.
185pub enum MethodResult<T> {
186	/// Result by value
187	Sync(T),
188	/// Future of a value
189	Async(BoxFuture<'static, T>),
190}
191
192impl<T: Debug> Debug for MethodResult<T> {
193	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194		match self {
195			MethodResult::Sync(result) => result.fmt(f),
196			MethodResult::Async(_) => f.write_str("<future>"),
197		}
198	}
199}
200
201impl Debug for MethodCallback {
202	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203		match self {
204			Self::Async(_) => write!(f, "Async"),
205			Self::Sync(_) => write!(f, "Sync"),
206			Self::Subscription(_) => write!(f, "Subscription"),
207			Self::Unsubscription(_) => write!(f, "Unsubscription"),
208		}
209	}
210}
211
212/// Reference-counted, clone-on-write collection of synchronous and asynchronous methods.
213#[derive(Default, Debug, Clone)]
214pub struct Methods {
215	callbacks: Arc<FxHashMap<&'static str, MethodCallback>>,
216	extensions: Extensions,
217}
218
219impl Methods {
220	/// Creates a new empty [`Methods`].
221	pub fn new() -> Self {
222		Self::default()
223	}
224
225	/// Verifies that the method name is not already taken, and returns an error if it is.
226	pub fn verify_method_name(&mut self, name: &'static str) -> Result<(), RegisterMethodError> {
227		if self.callbacks.contains_key(name) {
228			return Err(RegisterMethodError::AlreadyRegistered(name.into()));
229		}
230
231		Ok(())
232	}
233
234	/// Inserts the method callback for a given name, or returns an error if the name was already taken.
235	/// On success it returns a mut reference to the [`MethodCallback`] just inserted.
236	pub fn verify_and_insert(
237		&mut self,
238		name: &'static str,
239		callback: MethodCallback,
240	) -> Result<&mut MethodCallback, RegisterMethodError> {
241		match self.mut_callbacks().entry(name) {
242			Entry::Occupied(_) => Err(RegisterMethodError::AlreadyRegistered(name.into())),
243			Entry::Vacant(vacant) => Ok(vacant.insert(callback)),
244		}
245	}
246
247	/// Helper for obtaining a mut ref to the callbacks HashMap.
248	fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> {
249		Arc::make_mut(&mut self.callbacks)
250	}
251
252	/// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other` into `self`.
253	/// Fails if any of the methods in `other` is present already.
254	pub fn merge(&mut self, other: impl Into<Methods>) -> Result<(), RegisterMethodError> {
255		let mut other = other.into();
256
257		for name in other.callbacks.keys() {
258			self.verify_method_name(name)?;
259		}
260
261		let callbacks = self.mut_callbacks();
262
263		for (name, callback) in other.mut_callbacks().drain() {
264			callbacks.insert(name, callback);
265		}
266
267		Ok(())
268	}
269
270	/// Returns the method callback.
271	pub fn method(&self, method_name: &str) -> Option<&MethodCallback> {
272		self.callbacks.get(method_name)
273	}
274
275	/// Returns the method callback along with its name. The returned name is same as the
276	/// `method_name`, but its lifetime bound is `'static`.
277	pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> {
278		self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
279	}
280
281	/// Helper to call a method on the `RPC module` without having to spin up a server.
282	///
283	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
284	///
285	/// Returns the decoded value of the `result field` in JSON-RPC response if successful.
286	///
287	/// # Examples
288	///
289	/// ```
290	/// #[tokio::main]
291	/// async fn main() {
292	///     use jsonrpsee::{RpcModule, IntoResponse};
293	///     use jsonrpsee::core::RpcResult;
294	///
295	///     let mut module = RpcModule::new(());
296	///     module.register_method::<RpcResult<u64>, _>("echo_call", |params, _, _| {
297	///         params.one::<u64>().map_err(Into::into)
298	///     }).unwrap();
299	///
300	///     let echo: u64 = module.call("echo_call", [1_u64]).await.unwrap();
301	///     assert_eq!(echo, 1);
302	/// }
303	/// ```
304	pub async fn call<Params: ToRpcParams, T: DeserializeOwned + Clone>(
305		&self,
306		method: &str,
307		params: Params,
308	) -> Result<T, MethodsError> {
309		let params = params.to_rpc_params()?;
310		let req = Request::new(method.into(), params.as_ref().map(|p| p.as_ref()), Id::Number(0));
311		tracing::trace!(target: LOG_TARGET, "[Methods::call] Method: {:?}, params: {:?}", method, params);
312		let (rp, _) = self.inner_call(req, 1, mock_subscription_permit()).await;
313
314		let rp = serde_json::from_str::<Response<T>>(&rp)?;
315		ResponseSuccess::try_from(rp).map(|s| s.result).map_err(|e| MethodsError::JsonRpc(e.into_owned()))
316	}
317
318	/// Make a request (JSON-RPC method call or subscription) by using raw JSON.
319	///
320	/// Returns the raw JSON response to the call and a stream to receive notifications if the call was a subscription.
321	///
322	/// # Examples
323	///
324	/// ```
325	/// #[tokio::main]
326	/// async fn main() {
327	///     use jsonrpsee::{RpcModule, SubscriptionMessage};
328	///     use jsonrpsee::types::{response::Success, Response};
329	///     use futures_util::StreamExt;
330	///
331	///     let mut module = RpcModule::new(());
332	///     module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async {
333	///         let sink = pending.accept().await?;
334	///
335	///         // see comment above.
336	///         sink.send("one answer".into()).await?;
337	///
338	///         Ok(())
339	///     }).unwrap();
340	///     let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#, 1).await.unwrap();
341	///     // If the response is an error converting it to `Success` will fail.
342	///     let resp: Success<u64> = serde_json::from_str::<Response<u64>>(&resp).unwrap().try_into().unwrap();
343	///     let sub_resp = stream.recv().await.unwrap();
344	///     assert_eq!(
345	///         format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
346	///         sub_resp
347	///     );
348	/// }
349	/// ```
350	pub async fn raw_json_request(
351		&self,
352		request: &str,
353		buf_size: usize,
354	) -> Result<(String, mpsc::Receiver<String>), serde_json::Error> {
355		tracing::trace!("[Methods::raw_json_request] Request: {:?}", request);
356		let req: Request = serde_json::from_str(request)?;
357		let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
358
359		Ok((resp, rx))
360	}
361
362	/// Execute a callback.
363	async fn inner_call(
364		&self,
365		req: Request<'_>,
366		buf_size: usize,
367		subscription_permit: SubscriptionPermit,
368	) -> RawRpcResponse {
369		let (tx, mut rx) = mpsc::channel(buf_size);
370		// The extensions is always empty when calling the method directly because decoding an JSON-RPC
371		// request doesn't have any extensions.
372		let Request { id, method, params, .. } = req;
373		let params = Params::new(params.as_ref().map(|params| params.as_ref().get()));
374		let max_response_size = usize::MAX;
375		let conn_id = ConnectionId(0);
376		let mut ext = self.extensions.clone();
377		ext.insert(conn_id);
378
379		let response = match self.method(&method) {
380			None => MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)),
381			Some(MethodCallback::Sync(cb)) => (cb)(id, params, max_response_size, ext),
382			Some(MethodCallback::Async(cb)) => {
383				(cb)(id.into_owned(), params.into_owned(), conn_id, max_response_size, ext).await
384			}
385			Some(MethodCallback::Subscription(cb)) => {
386				let conn_state =
387					SubscriptionState { conn_id, id_provider: &RandomIntegerIdProvider, subscription_permit };
388				let res = (cb)(id, params, MethodSink::new(tx.clone()), conn_state, ext).await;
389
390				// This message is not used because it's used for metrics so we discard in other to
391				// not read once this is used for subscriptions.
392				//
393				// The same information is part of `res` above.
394				let _ = rx.recv().await.expect("Every call must at least produce one response; qed");
395
396				res
397			}
398			Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, conn_id, max_response_size, ext),
399		};
400
401		let is_success = response.is_success();
402		let (rp, notif) = response.into_parts();
403
404		if let Some(n) = notif {
405			n.notify(is_success);
406		}
407
408		tracing::trace!(target: LOG_TARGET, "[Methods::inner_call] Method: {}, response: {}", method, rp);
409
410		(rp, rx)
411	}
412
413	/// Helper to create a subscription on the `RPC module` without having to spin up a server.
414	///
415	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
416	///
417	/// Returns [`Subscription`] on success which can used to get results from the subscription.
418	///
419	/// # Examples
420	///
421	/// ```
422	/// #[tokio::main]
423	/// async fn main() {
424	///     use jsonrpsee::{RpcModule, SubscriptionMessage};
425	///     use jsonrpsee::core::{EmptyServerParams, RpcResult};
426	///
427	///     let mut module = RpcModule::new(());
428	///     module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async move {
429	///         let sink = pending.accept().await?;
430	///         sink.send("one answer".into()).await?;
431	///         Ok(())
432	///     }).unwrap();
433	///
434	///     let mut sub = module.subscribe_unbounded("hi", EmptyServerParams::new()).await.unwrap();
435	///     // In this case we ignore the subscription ID,
436	///     let (sub_resp, _sub_id) = sub.next::<String>().await.unwrap().unwrap();
437	///     assert_eq!(&sub_resp, "one answer");
438	/// }
439	/// ```
440	pub async fn subscribe_unbounded(
441		&self,
442		sub_method: &str,
443		params: impl ToRpcParams,
444	) -> Result<Subscription, MethodsError> {
445		self.subscribe(sub_method, params, u32::MAX as usize).await
446	}
447
448	/// Similar to [`Methods::subscribe_unbounded`] but it's using a bounded channel and the buffer capacity must be
449	/// provided.
450	///
451	pub async fn subscribe(
452		&self,
453		sub_method: &str,
454		params: impl ToRpcParams,
455		buf_size: usize,
456	) -> Result<Subscription, MethodsError> {
457		let params = params.to_rpc_params()?;
458		let req = Request::new(sub_method.into(), params.as_ref().map(|p| p.as_ref()), Id::Number(0));
459
460		tracing::trace!(target: LOG_TARGET, "[Methods::subscribe] Method: {}, params: {:?}", sub_method, params);
461
462		let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
463
464		// TODO: hack around the lifetime on the `SubscriptionId` by deserialize first to serde_json::Value.
465		let as_success: ResponseSuccess<serde_json::Value> = serde_json::from_str::<Response<_>>(&resp)?.try_into()?;
466
467		let sub_id = as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.clone()))?;
468
469		Ok(Subscription { sub_id, rx })
470	}
471
472	/// Returns an `Iterator` with all the method names registered on this server.
473	pub fn method_names(&self) -> impl Iterator<Item = &'static str> + '_ {
474		self.callbacks.keys().copied()
475	}
476
477	/// Similar to [`Methods::extensions_mut`] but it's immutable.
478	pub fn extensions(&mut self) -> &Extensions {
479		&self.extensions
480	}
481
482	/// Get a mutable reference to the extensions to add or remove data from
483	/// the extensions.
484	///
485	/// This only affects direct calls to the methods and subscriptions
486	/// and can be used for example to unit test the API without a server.
487	///
488	/// # Examples
489	///
490	/// ```
491	/// #[tokio::main]
492	/// async fn main() {
493	///     use jsonrpsee::{RpcModule, IntoResponse, Extensions};
494	///     use jsonrpsee::core::RpcResult;
495	///
496	///     let mut module = RpcModule::new(());
497	///     module.register_method::<RpcResult<u64>, _>("magic_multiply", |params, _, ext| {
498	///         let magic = ext.get::<u64>().copied().unwrap();
499	///         let val = params.one::<u64>()?;
500	///         Ok(val * magic)
501	///     }).unwrap();
502	///
503	///     // inject arbitrary data into the extensions.
504	///     module.extensions_mut().insert(33_u64);
505	///
506	///     let magic: u64 = module.call("magic_multiply", [1_u64]).await.unwrap();
507	///     assert_eq!(magic, 33);
508	/// }
509	/// ```
510	pub fn extensions_mut(&mut self) -> &mut Extensions {
511		&mut self.extensions
512	}
513}
514
515impl<Context> Deref for RpcModule<Context> {
516	type Target = Methods;
517
518	fn deref(&self) -> &Methods {
519		&self.methods
520	}
521}
522
523impl<Context> DerefMut for RpcModule<Context> {
524	fn deref_mut(&mut self) -> &mut Methods {
525		&mut self.methods
526	}
527}
528
529/// Sets of JSON-RPC methods can be organized into "module"s that are in turn registered on the server or,
530/// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context
531/// argument that can be used to access data during call execution.
532#[derive(Debug, Clone)]
533pub struct RpcModule<Context> {
534	ctx: Arc<Context>,
535	methods: Methods,
536}
537
538impl<Context> RpcModule<Context> {
539	/// Create a new module with a given shared `Context`.
540	pub fn new(ctx: Context) -> Self {
541		Self::from_arc(Arc::new(ctx))
542	}
543
544	/// Create a new module from an already shared `Context`.
545	///
546	/// This is useful if `Context` needs to be shared outside of an [`RpcModule`].
547	pub fn from_arc(ctx: Arc<Context>) -> Self {
548		Self { ctx, methods: Default::default() }
549	}
550
551	/// Transform a module into an `RpcModule<()>` (unit context).
552	pub fn remove_context(self) -> RpcModule<()> {
553		let mut module = RpcModule::new(());
554		module.methods = self.methods;
555		module
556	}
557}
558
559impl<Context> From<RpcModule<Context>> for Methods {
560	fn from(module: RpcModule<Context>) -> Methods {
561		module.methods
562	}
563}
564
565impl<Context: Send + Sync + 'static> RpcModule<Context> {
566	/// Register a new synchronous RPC method, which computes the response with the given callback.
567	///
568	/// ## Examples
569	///
570	/// ```
571	/// use jsonrpsee_core::server::RpcModule;
572	///
573	/// let mut module = RpcModule::new(());
574	/// module.register_method("say_hello", |_params, _ctx, _| "lo").unwrap();
575	/// ```
576	pub fn register_method<R, F>(
577		&mut self,
578		method_name: &'static str,
579		callback: F,
580	) -> Result<&mut MethodCallback, RegisterMethodError>
581	where
582		Context: Send + Sync + 'static,
583		R: IntoResponse + 'static,
584		F: Fn(Params, &Context, &Extensions) -> R + Send + Sync + 'static,
585	{
586		let ctx = self.ctx.clone();
587		self.methods.verify_and_insert(
588			method_name,
589			MethodCallback::Sync(Arc::new(move |id, params, max_response_size, extensions| {
590				let rp = callback(params, &*ctx, &extensions).into_response();
591				MethodResponse::response(id, rp, max_response_size).with_extensions(extensions)
592			})),
593		)
594	}
595
596	/// Removes the method if it exists.
597	///
598	/// Be aware that a subscription consist of two methods, `subscribe` and `unsubscribe` and
599	/// it's the caller responsibility to remove both `subscribe` and `unsubscribe` methods for subscriptions.
600	pub fn remove_method(&mut self, method_name: &'static str) -> Option<MethodCallback> {
601		self.methods.mut_callbacks().remove(method_name)
602	}
603
604	/// Register a new asynchronous RPC method, which computes the response with the given callback.
605	///
606	/// ## Examples
607	///
608	/// ```
609	/// use jsonrpsee_core::server::RpcModule;
610	///
611	/// let mut module = RpcModule::new(());
612	/// module.register_async_method("say_hello", |_params, _ctx, _| async { "lo" }).unwrap();
613	///
614	/// ```
615	///
616	pub fn register_async_method<R, Fun, Fut>(
617		&mut self,
618		method_name: &'static str,
619		callback: Fun,
620	) -> Result<&mut MethodCallback, RegisterMethodError>
621	where
622		R: IntoResponse + 'static,
623		Fut: Future<Output = R> + Send,
624		Fun: (Fn(Params<'static>, Arc<Context>, Extensions) -> Fut) + Clone + Send + Sync + 'static,
625	{
626		let ctx = self.ctx.clone();
627		self.methods.verify_and_insert(
628			method_name,
629			MethodCallback::Async(Arc::new(move |id, params, _, max_response_size, extensions| {
630				let ctx = ctx.clone();
631				let callback = callback.clone();
632
633				// NOTE: the extensions can't be mutated at this point so
634				// it's safe to clone it.
635				let future = async move {
636					let rp = callback(params, ctx, extensions.clone()).await.into_response();
637					MethodResponse::response(id, rp, max_response_size).with_extensions(extensions)
638				};
639				future.boxed()
640			})),
641		)
642	}
643
644	/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
645	/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform
646	/// expensive computations.
647	pub fn register_blocking_method<R, F>(
648		&mut self,
649		method_name: &'static str,
650		callback: F,
651	) -> Result<&mut MethodCallback, RegisterMethodError>
652	where
653		Context: Send + Sync + 'static,
654		R: IntoResponse + 'static,
655		F: Fn(Params, Arc<Context>, Extensions) -> R + Clone + Send + Sync + 'static,
656	{
657		let ctx = self.ctx.clone();
658		let callback = self.methods.verify_and_insert(
659			method_name,
660			MethodCallback::Async(Arc::new(move |id, params, _, max_response_size, extensions| {
661				let ctx = ctx.clone();
662				let callback = callback.clone();
663
664				// NOTE: the extensions can't be mutated at this point so
665				// it's safe to clone it.
666				let extensions2 = extensions.clone();
667
668				tokio::task::spawn_blocking(move || {
669					let rp = callback(params, ctx, extensions2.clone()).into_response();
670					MethodResponse::response(id, rp, max_response_size).with_extensions(extensions2)
671				})
672				.map(|result| match result {
673					Ok(r) => r,
674					Err(err) => {
675						tracing::error!(target: LOG_TARGET, "Join error for blocking RPC method: {:?}", err);
676						MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
677							.with_extensions(extensions)
678					}
679				})
680				.boxed()
681			})),
682		)?;
683
684		Ok(callback)
685	}
686
687	/// Register a new publish/subscribe interface using JSON-RPC notifications.
688	///
689	/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
690	/// with an option to choose custom subscription ID generation.
691	///
692	/// Furthermore, it generates the `unsubscribe implementation` where a `bool` is used as
693	/// the result to indicate whether the subscription was successfully unsubscribed to or not.
694	/// For instance an `unsubscribe call` may fail if a non-existent subscription ID is used in the call.
695	///
696	/// This method ensures that the `subscription_method_name` and `unsubscription_method_name` are unique.
697	/// The `notif_method_name` argument sets the content of the `method` field in the JSON document that
698	/// the server sends back to the client. The uniqueness of this value is not machine checked and it's up to
699	/// the user to ensure it is not used in any other [`RpcModule`] used in the server.
700	///
701	/// # Arguments
702	///
703	/// * `subscription_method_name` - name of the method to call to initiate a subscription
704	/// * `notif_method_name` - name of method to be used in the subscription payload (technically a JSON-RPC
705	///   notification)
706	/// * `unsubscription_method` - name of the method to call to terminate a subscription
707	/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
708	///     - [`Params`]: JSON-RPC parameters in the subscription call.
709	///     - [`PendingSubscriptionSink`]: A pending subscription waiting to be accepted, in order to send out messages
710	///       on the subscription
711	///     - Context: Any type that can be embedded into the [`RpcModule`].
712	///
713	/// # Returns
714	///
715	/// An async block which returns something that implements [`crate::server::IntoSubscriptionCloseResponse`] which
716	/// decides what action to take when the subscription ends whether such as to sent out another message
717	/// on the subscription stream before closing down it.
718	///
719	/// NOTE: The return value is ignored if [`PendingSubscriptionSink`] hasn't been called or is unsuccessful, as the subscription
720	/// is not allowed to send out subscription notifications before the actual subscription has been established.
721	///
722	/// This is implemented for `Result<T, E>` and `()`.
723	///
724	/// It's recommended to use `Result` if you want to propagate the error as special error notification
725	/// Another option is to implement [`crate::server::IntoSubscriptionCloseResponse`] if you want customized behaviour.
726	///
727	/// The error notification has the following format:
728	///
729	/// ```json
730	/// {
731	///  "jsonrpc": "2.0",
732	///  "method": "<method>",
733	///  "params": {
734	///    "subscription": "<subscriptionID>",
735	///    "error": <your msg>
736	///    }
737	///  }
738	/// }
739	/// ```
740	///
741	/// # Examples
742	///
743	/// ```no_run
744	///
745	/// use jsonrpsee_core::server::{RpcModule, SubscriptionSink, SubscriptionMessage};
746	/// use jsonrpsee_types::ErrorObjectOwned;
747	///
748	/// let mut ctx = RpcModule::new(99_usize);
749	/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx, _| async move {
750	///
751	///     let x = match params.one::<usize>() {
752	///         Ok(x) => x,
753	///         Err(e) => {
754	///            pending.reject(ErrorObjectOwned::from(e)).await;
755	///            // If the subscription has not been "accepted" then
756	///            // the return value will be "ignored" as it's not
757	///            // allowed to send out any further notifications on
758	///            // on the subscription.
759	///            return Ok(());
760	///         }
761	///     };
762	///
763	///     // Mark the subscription is accepted after the params has been parsed successful.
764	///     // This is actually responds the underlying RPC method call and may fail if the
765	///     // connection is closed.
766	///     let sink = pending.accept().await?;
767	///     let sum = x + (*ctx);
768	///
769	///     // This will send out an error notification if it fails.
770	///     //
771	///     // If you need some other behavior implement or custom format of the error field
772	///     // you need to manually handle that.
773	///     let msg = SubscriptionMessage::from_json(&sum)?;
774	///
775	///     // This fails only if the connection is closed
776	///     sink.send(msg).await?;
777	///
778	///     Ok(())
779	/// });
780	/// ```
781	pub fn register_subscription<R, F, Fut>(
782		&mut self,
783		subscribe_method_name: &'static str,
784		notif_method_name: &'static str,
785		unsubscribe_method_name: &'static str,
786		callback: F,
787	) -> Result<&mut MethodCallback, RegisterMethodError>
788	where
789		Context: Send + Sync + 'static,
790		F: (Fn(Params<'static>, PendingSubscriptionSink, Arc<Context>, Extensions) -> Fut)
791			+ Send
792			+ Sync
793			+ Clone
794			+ 'static,
795		Fut: Future<Output = R> + Send + 'static,
796		R: IntoSubscriptionCloseResponse + Send,
797	{
798		let subscribers = self.verify_and_register_unsubscribe(subscribe_method_name, unsubscribe_method_name)?;
799		let ctx = self.ctx.clone();
800
801		// Subscribe
802		let callback = {
803			self.methods.verify_and_insert(
804				subscribe_method_name,
805				MethodCallback::Subscription(Arc::new(move |id, params, method_sink, conn, extensions| {
806					let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
807
808					// response to the subscription call.
809					let (tx, rx) = oneshot::channel();
810					let (accepted_tx, accepted_rx) = oneshot::channel();
811
812					let sub_id = uniq_sub.sub_id.clone();
813					let method = notif_method_name;
814
815					let sink = PendingSubscriptionSink {
816						inner: method_sink.clone(),
817						method: notif_method_name,
818						subscribers: subscribers.clone(),
819						uniq_sub,
820						id: id.clone().into_owned(),
821						subscribe: tx,
822						permit: conn.subscription_permit,
823					};
824
825					// The subscription callback is a future from the subscription
826					// definition and not the as same when the subscription call has been completed.
827					//
828					// This runs until the subscription callback has completed.
829					//
830					// NOTE: the extensions can't be mutated at this point so
831					// it's safe to clone it.
832					let sub_fut = callback(params.into_owned(), sink, ctx.clone(), extensions.clone());
833
834					tokio::spawn(async move {
835						// This will wait for the subscription future to be resolved
836						let response = match futures_util::future::try_join(sub_fut.map(|f| Ok(f)), accepted_rx).await {
837							Ok((r, _)) => r.into_response(),
838							// The accept call failed i.e, the subscription was not accepted.
839							Err(_) => return,
840						};
841
842						match response {
843							SubscriptionCloseResponse::Notif(msg) => {
844								let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &sub_id, method);
845								let _ = method_sink.send(json).await;
846							}
847							SubscriptionCloseResponse::NotifErr(msg) => {
848								let json = sub_message_to_json(msg, SubNotifResultOrError::Error, &sub_id, method);
849								let _ = method_sink.send(json).await;
850							}
851							SubscriptionCloseResponse::None => (),
852						}
853					});
854
855					let id = id.clone().into_owned();
856
857					Box::pin(async move {
858						let rp = match rx.await {
859							Ok(rp) => {
860								// If the subscription was accepted then send a message
861								// to subscription task otherwise rely on the drop impl.
862								if rp.is_success() {
863									let _ = accepted_tx.send(());
864								}
865								rp
866							}
867							Err(_) => MethodResponse::error(id, ErrorCode::InternalError),
868						};
869
870						rp.with_extensions(extensions)
871					})
872				})),
873			)?
874		};
875
876		Ok(callback)
877	}
878
879	/// Similar to [`RpcModule::register_subscription`] but a little lower-level API
880	/// where handling the subscription is managed the user i.e, polling the subscription
881	/// such as spawning a separate task to do so.
882	///
883	/// This is more efficient as this doesn't require cloning the `params` in the subscription
884	/// and it won't send out a close message. Such things are delegated to the user of this API
885	///
886	/// # Examples
887	///
888	/// ```no_run
889	///
890	/// use jsonrpsee_core::server::{RpcModule, SubscriptionSink, SubscriptionMessage};
891	/// use jsonrpsee_types::ErrorObjectOwned;
892	///
893	/// let mut ctx = RpcModule::new(99_usize);
894	/// ctx.register_subscription_raw("sub", "notif_name", "unsub", |params, pending, ctx, _| {
895	///
896	///     // The params are parsed outside the async block below to avoid cloning the bytes.
897	///     let val = match params.one::<usize>() {
898	///         Ok(val) => val,
899	///         Err(e) => {
900	///             // If the subscription has not been "accepted" then
901	///             // the return value will be "ignored" as it's not
902	///             // allowed to send out any further notifications on
903	///             // on the subscription.
904	///             tokio::spawn(pending.reject(ErrorObjectOwned::from(e)));
905	///             return;
906	///         }
907	///     };
908	///
909	///     tokio::spawn(async move {
910	///         // Mark the subscription is accepted after the params has been parsed successful.
911	///         // This is actually responds the underlying RPC method call and may fail if the
912	///         // connection is closed.
913	///         let sink = pending.accept().await.unwrap();
914	///         let sum = val + (*ctx);
915	///
916	///         let msg = SubscriptionMessage::from_json(&sum).unwrap();
917	///
918	///         // This fails only if the connection is closed
919	///         sink.send(msg).await.unwrap();
920	///     });
921	/// });
922	/// ```
923	///
924	pub fn register_subscription_raw<R, F>(
925		&mut self,
926		subscribe_method_name: &'static str,
927		notif_method_name: &'static str,
928		unsubscribe_method_name: &'static str,
929		callback: F,
930	) -> Result<&mut MethodCallback, RegisterMethodError>
931	where
932		Context: Send + Sync + 'static,
933		F: (Fn(Params, PendingSubscriptionSink, Arc<Context>, &Extensions) -> R) + Send + Sync + Clone + 'static,
934		R: IntoSubscriptionCloseResponse,
935	{
936		let subscribers = self.verify_and_register_unsubscribe(subscribe_method_name, unsubscribe_method_name)?;
937		let ctx = self.ctx.clone();
938
939		// Subscribe
940		let callback = {
941			self.methods.verify_and_insert(
942				subscribe_method_name,
943				MethodCallback::Subscription(Arc::new(move |id, params, method_sink, conn, extensions| {
944					let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
945
946					// response to the subscription call.
947					let (tx, rx) = oneshot::channel();
948
949					let sink = PendingSubscriptionSink {
950						inner: method_sink.clone(),
951						method: notif_method_name,
952						subscribers: subscribers.clone(),
953						uniq_sub,
954						id: id.clone().into_owned(),
955						subscribe: tx,
956						permit: conn.subscription_permit,
957					};
958
959					callback(params, sink, ctx.clone(), &extensions);
960
961					let id = id.clone().into_owned();
962
963					Box::pin(async move {
964						let rp = match rx.await {
965							Ok(rp) => rp,
966							Err(_) => MethodResponse::error(id, ErrorCode::InternalError),
967						};
968
969						rp.with_extensions(extensions)
970					})
971				})),
972			)?
973		};
974
975		Ok(callback)
976	}
977
978	/// Helper to verify the subscription can be created
979	/// and register the unsubscribe handler.
980	fn verify_and_register_unsubscribe(
981		&mut self,
982		subscribe_method_name: &'static str,
983		unsubscribe_method_name: &'static str,
984	) -> Result<Subscribers, RegisterMethodError> {
985		if subscribe_method_name == unsubscribe_method_name {
986			return Err(RegisterMethodError::SubscriptionNameConflict(subscribe_method_name.into()));
987		}
988
989		self.methods.verify_method_name(subscribe_method_name)?;
990		self.methods.verify_method_name(unsubscribe_method_name)?;
991
992		let subscribers = Subscribers::default();
993
994		// Unsubscribe
995		{
996			let subscribers = subscribers.clone();
997			self.methods.mut_callbacks().insert(
998				unsubscribe_method_name,
999				MethodCallback::Unsubscription(Arc::new(move |id, params, conn_id, max_response_size, extensions| {
1000					let sub_id = match params.one::<RpcSubscriptionId>() {
1001						Ok(sub_id) => sub_id,
1002						Err(_) => {
1003							tracing::warn!(
1004								target: LOG_TARGET,
1005								"Unsubscribe call `{}` failed: couldn't parse subscription id={:?} request id={:?}",
1006								unsubscribe_method_name,
1007								params,
1008								id
1009							);
1010
1011							return MethodResponse::response(id, ResponsePayload::success(false), max_response_size)
1012								.with_extensions(extensions);
1013						}
1014					};
1015
1016					let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
1017					let result = subscribers.lock().remove(&key).is_some();
1018
1019					if !result {
1020						tracing::debug!(
1021							target: LOG_TARGET,
1022							"Unsubscribe call `{}` subscription key={:?} not an active subscription",
1023							unsubscribe_method_name,
1024							key,
1025						);
1026					}
1027
1028					MethodResponse::response(id, ResponsePayload::success(result), max_response_size)
1029				})),
1030			);
1031		}
1032
1033		Ok(subscribers)
1034	}
1035
1036	/// Register an alias for an existing_method. Alias uniqueness is enforced.
1037	pub fn register_alias(
1038		&mut self,
1039		alias: &'static str,
1040		existing_method: &'static str,
1041	) -> Result<(), RegisterMethodError> {
1042		self.methods.verify_method_name(alias)?;
1043
1044		let callback = match self.methods.callbacks.get(existing_method) {
1045			Some(callback) => callback.clone(),
1046			None => return Err(RegisterMethodError::MethodNotFound(existing_method.into())),
1047		};
1048
1049		self.methods.mut_callbacks().insert(alias, callback);
1050
1051		Ok(())
1052	}
1053}
1054
1055fn mock_subscription_permit() -> SubscriptionPermit {
1056	BoundedSubscriptions::new(1).acquire().expect("1 permit should exist; qed")
1057}