jsonrpsee_server/
future.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Utilities for handling async code.
28
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use futures_util::{Future, Stream, StreamExt};
34use pin_project::pin_project;
35use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError};
36use tokio::time::Interval;
37use tokio_stream::wrappers::BroadcastStream;
38
39/// Create channel to determine whether
40/// the server shall continue to run or not.
41pub fn stop_channel() -> (StopHandle, ServerHandle) {
42	let (tx, rx) = tokio::sync::watch::channel(());
43	(StopHandle::new(rx), ServerHandle::new(tx))
44}
45
46/// Represent a stop handle which is a wrapper over a `multi-consumer receiver`
47/// and cloning [`StopHandle`] will get a separate instance of the underlying receiver.
48#[derive(Debug, Clone)]
49pub struct StopHandle(watch::Receiver<()>);
50
51impl StopHandle {
52	/// Create a new stop handle.
53	pub(crate) fn new(rx: watch::Receiver<()>) -> Self {
54		Self(rx)
55	}
56
57	/// A future that resolves when server has been stopped
58	/// it consumes the stop handle.
59	pub async fn shutdown(mut self) {
60		let _ = self.0.changed().await;
61	}
62}
63
64/// Error when the server has already been stopped.
65#[derive(Debug, Copy, Clone, thiserror::Error)]
66#[error("The server is already stopped")]
67pub struct AlreadyStoppedError;
68
69/// Server handle.
70///
71/// When all [`StopHandle`]'s have been `dropped` or `stop` has been called
72/// the server will be stopped.
73#[derive(Debug, Clone)]
74pub struct ServerHandle(Arc<watch::Sender<()>>);
75
76impl ServerHandle {
77	/// Create a new server handle.
78	pub(crate) fn new(tx: watch::Sender<()>) -> Self {
79		Self(Arc::new(tx))
80	}
81
82	/// Tell the server to stop without waiting for the server to stop.
83	pub fn stop(&self) -> Result<(), AlreadyStoppedError> {
84		self.0.send(()).map_err(|_| AlreadyStoppedError)
85	}
86
87	/// Wait for the server to stop.
88	pub async fn stopped(self) {
89		self.0.closed().await
90	}
91
92	/// Check if the server has been stopped.
93	pub fn is_stopped(&self) -> bool {
94		self.0.is_closed()
95	}
96}
97
98/// Limits the number of connections.
99#[derive(Clone, Debug)]
100pub struct ConnectionGuard {
101	inner: Arc<Semaphore>,
102	max: usize,
103}
104
105impl ConnectionGuard {
106	/// Create a new connection guard.
107	pub fn new(limit: usize) -> Self {
108		Self { inner: Arc::new(Semaphore::new(limit)), max: limit }
109	}
110
111	/// Acquire a connection permit.
112	pub fn try_acquire(&self) -> Option<ConnectionPermit> {
113		match self.inner.clone().try_acquire_owned() {
114			Ok(guard) => Some(guard),
115			Err(TryAcquireError::Closed) => unreachable!("Semaphore::Close is never called and can't be closed; qed"),
116			Err(TryAcquireError::NoPermits) => None,
117		}
118	}
119
120	/// Get the number of available connection slots.
121	pub fn available_connections(&self) -> usize {
122		self.inner.available_permits()
123	}
124
125	/// Get the maximum number of connections.
126	pub fn max_connections(&self) -> usize {
127		self.max
128	}
129}
130
131/// Connection permit.
132pub type ConnectionPermit = OwnedSemaphorePermit;
133
134#[pin_project]
135pub(crate) struct IntervalStream(#[pin] Option<tokio_stream::wrappers::IntervalStream>);
136
137impl IntervalStream {
138	/// Creates a stream which never returns any elements.
139	pub(crate) fn pending() -> Self {
140		Self(None)
141	}
142
143	/// Creates a stream which produces elements with interval of `period`.
144	pub(crate) fn new(interval: Interval) -> Self {
145		Self(Some(tokio_stream::wrappers::IntervalStream::new(interval)))
146	}
147}
148
149impl Stream for IntervalStream {
150	type Item = tokio::time::Instant;
151
152	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
153		if let Some(mut stream) = self.project().0.as_pin_mut() {
154			stream.poll_next_unpin(cx)
155		} else {
156			// NOTE: this will not be woken up again and it's by design
157			// to be a pending stream that never returns.
158			Poll::Pending
159		}
160	}
161}
162
163#[derive(Debug, Clone)]
164pub(crate) struct SessionClose(tokio::sync::broadcast::Sender<()>);
165
166impl SessionClose {
167	pub(crate) fn close(self) {
168		let _ = self.0.send(());
169	}
170
171	pub(crate) fn closed(&self) -> SessionClosedFuture {
172		SessionClosedFuture(BroadcastStream::new(self.0.subscribe()))
173	}
174}
175
176/// A future that resolves when the connection has been closed.
177#[derive(Debug)]
178pub struct SessionClosedFuture(BroadcastStream<()>);
179
180impl Future for SessionClosedFuture {
181	type Output = ();
182
183	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184		match self.0.poll_next_unpin(cx) {
185			Poll::Pending => Poll::Pending,
186			// Only message is only sent and
187			// ignore can't keep up errors.
188			Poll::Ready(_) => Poll::Ready(()),
189		}
190	}
191}
192
193pub(crate) fn session_close() -> (SessionClose, SessionClosedFuture) {
194	// SessionClosedFuture is closed after one message has been recevied
195	// and max one message is handled then it's closed.
196	let (tx, rx) = tokio::sync::broadcast::channel(1);
197	(SessionClose(tx), SessionClosedFuture(BroadcastStream::new(rx)))
198}