jsonrpsee_server/
future.rs1use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use futures_util::{Future, Stream, StreamExt};
34use pin_project::pin_project;
35use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError};
36use tokio::time::Interval;
37use tokio_stream::wrappers::BroadcastStream;
38
39pub fn stop_channel() -> (StopHandle, ServerHandle) {
42 let (tx, rx) = tokio::sync::watch::channel(());
43 (StopHandle::new(rx), ServerHandle::new(tx))
44}
45
46#[derive(Debug, Clone)]
49pub struct StopHandle(watch::Receiver<()>);
50
51impl StopHandle {
52 pub(crate) fn new(rx: watch::Receiver<()>) -> Self {
54 Self(rx)
55 }
56
57 pub async fn shutdown(mut self) {
60 let _ = self.0.changed().await;
61 }
62}
63
64#[derive(Debug, Copy, Clone, thiserror::Error)]
66#[error("The server is already stopped")]
67pub struct AlreadyStoppedError;
68
69#[derive(Debug, Clone)]
74pub struct ServerHandle(Arc<watch::Sender<()>>);
75
76impl ServerHandle {
77 pub(crate) fn new(tx: watch::Sender<()>) -> Self {
79 Self(Arc::new(tx))
80 }
81
82 pub fn stop(&self) -> Result<(), AlreadyStoppedError> {
84 self.0.send(()).map_err(|_| AlreadyStoppedError)
85 }
86
87 pub async fn stopped(self) {
89 self.0.closed().await
90 }
91
92 pub fn is_stopped(&self) -> bool {
94 self.0.is_closed()
95 }
96}
97
98#[derive(Clone, Debug)]
100pub struct ConnectionGuard {
101 inner: Arc<Semaphore>,
102 max: usize,
103}
104
105impl ConnectionGuard {
106 pub fn new(limit: usize) -> Self {
108 Self { inner: Arc::new(Semaphore::new(limit)), max: limit }
109 }
110
111 pub fn try_acquire(&self) -> Option<ConnectionPermit> {
113 match self.inner.clone().try_acquire_owned() {
114 Ok(guard) => Some(guard),
115 Err(TryAcquireError::Closed) => unreachable!("Semaphore::Close is never called and can't be closed; qed"),
116 Err(TryAcquireError::NoPermits) => None,
117 }
118 }
119
120 pub fn available_connections(&self) -> usize {
122 self.inner.available_permits()
123 }
124
125 pub fn max_connections(&self) -> usize {
127 self.max
128 }
129}
130
131pub type ConnectionPermit = OwnedSemaphorePermit;
133
134#[pin_project]
135pub(crate) struct IntervalStream(#[pin] Option<tokio_stream::wrappers::IntervalStream>);
136
137impl IntervalStream {
138 pub(crate) fn pending() -> Self {
140 Self(None)
141 }
142
143 pub(crate) fn new(interval: Interval) -> Self {
145 Self(Some(tokio_stream::wrappers::IntervalStream::new(interval)))
146 }
147}
148
149impl Stream for IntervalStream {
150 type Item = tokio::time::Instant;
151
152 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
153 if let Some(mut stream) = self.project().0.as_pin_mut() {
154 stream.poll_next_unpin(cx)
155 } else {
156 Poll::Pending
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
164pub(crate) struct SessionClose(tokio::sync::broadcast::Sender<()>);
165
166impl SessionClose {
167 pub(crate) fn close(self) {
168 let _ = self.0.send(());
169 }
170
171 pub(crate) fn closed(&self) -> SessionClosedFuture {
172 SessionClosedFuture(BroadcastStream::new(self.0.subscribe()))
173 }
174}
175
176#[derive(Debug)]
178pub struct SessionClosedFuture(BroadcastStream<()>);
179
180impl Future for SessionClosedFuture {
181 type Output = ();
182
183 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184 match self.0.poll_next_unpin(cx) {
185 Poll::Pending => Poll::Pending,
186 Poll::Ready(_) => Poll::Ready(()),
189 }
190 }
191}
192
193pub(crate) fn session_close() -> (SessionClose, SessionClosedFuture) {
194 let (tx, rx) = tokio::sync::broadcast::channel(1);
197 (SessionClose(tx), SessionClosedFuture(BroadcastStream::new(rx)))
198}