sc_utils/
notification.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Provides mpsc notification channel that can be instantiated
20//! _after_ it's been shared to the consumer and producers entities.
21//!
22//! Useful when building RPC extensions where, at service definition time, we
23//! don't know whether the specific interface where the RPC extension will be
24//! exposed is safe or not and we want to lazily build the RPC extension
25//! whenever we bind the service to an interface.
26//!
27//! See [`sc-service::builder::RpcExtensionBuilder`] for more details.
28
29use futures::stream::{FusedStream, Stream};
30use std::{
31	pin::Pin,
32	task::{Context, Poll},
33};
34
35use crate::pubsub::{Hub, Receiver};
36
37mod registry;
38use registry::Registry;
39
40#[cfg(test)]
41mod tests;
42
43/// Trait used to define the "tracing key" string used to tag
44/// and identify the mpsc channels.
45pub trait TracingKeyStr {
46	/// Const `str` representing the "tracing key" used to tag and identify
47	/// the mpsc channels owned by the object implementing this trait.
48	const TRACING_KEY: &'static str;
49}
50
51/// The receiving half of the notifications channel.
52///
53/// The [`NotificationStream`] entity stores the [`Hub`] so it can be
54/// used to add more subscriptions.
55#[derive(Clone)]
56pub struct NotificationStream<Payload, TK: TracingKeyStr> {
57	hub: Hub<Payload, Registry>,
58	_pd: std::marker::PhantomData<TK>,
59}
60
61/// The receiving half of the notifications channel(s).
62#[derive(Debug)]
63pub struct NotificationReceiver<Payload> {
64	receiver: Receiver<Payload, Registry>,
65}
66
67/// The sending half of the notifications channel(s).
68pub struct NotificationSender<Payload> {
69	hub: Hub<Payload, Registry>,
70}
71
72impl<Payload, TK: TracingKeyStr> NotificationStream<Payload, TK> {
73	/// Creates a new pair of receiver and sender of `Payload` notifications.
74	pub fn channel() -> (NotificationSender<Payload>, Self) {
75		let hub = Hub::new(TK::TRACING_KEY);
76		let sender = NotificationSender { hub: hub.clone() };
77		let receiver = NotificationStream { hub, _pd: Default::default() };
78		(sender, receiver)
79	}
80
81	/// Subscribe to a channel through which the generic payload can be received.
82	pub fn subscribe(&self, queue_size_warning: usize) -> NotificationReceiver<Payload> {
83		let receiver = self.hub.subscribe((), queue_size_warning);
84		NotificationReceiver { receiver }
85	}
86}
87
88impl<Payload> NotificationSender<Payload> {
89	/// Send out a notification to all subscribers that a new payload is available for a
90	/// block.
91	pub fn notify<Error>(
92		&self,
93		payload: impl FnOnce() -> Result<Payload, Error>,
94	) -> Result<(), Error>
95	where
96		Payload: Clone,
97	{
98		self.hub.send(payload)
99	}
100}
101
102impl<Payload> Clone for NotificationSender<Payload> {
103	fn clone(&self) -> Self {
104		Self { hub: self.hub.clone() }
105	}
106}
107
108impl<Payload> Unpin for NotificationReceiver<Payload> {}
109
110impl<Payload> Stream for NotificationReceiver<Payload> {
111	type Item = Payload;
112
113	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Payload>> {
114		Pin::new(&mut self.get_mut().receiver).poll_next(cx)
115	}
116}
117
118impl<Payload> FusedStream for NotificationReceiver<Payload> {
119	fn is_terminated(&self) -> bool {
120		self.receiver.is_terminated()
121	}
122}