sc_utils/notification.rs
1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Provides mpsc notification channel that can be instantiated
20//! _after_ it's been shared to the consumer and producers entities.
21//!
22//! Useful when building RPC extensions where, at service definition time, we
23//! don't know whether the specific interface where the RPC extension will be
24//! exposed is safe or not and we want to lazily build the RPC extension
25//! whenever we bind the service to an interface.
26//!
27//! See [`sc-service::builder::RpcExtensionBuilder`] for more details.
28
29use futures::stream::{FusedStream, Stream};
30use std::{
31 pin::Pin,
32 task::{Context, Poll},
33};
34
35use crate::pubsub::{Hub, Receiver};
36
37mod registry;
38use registry::Registry;
39
40#[cfg(test)]
41mod tests;
42
43/// Trait used to define the "tracing key" string used to tag
44/// and identify the mpsc channels.
45pub trait TracingKeyStr {
46 /// Const `str` representing the "tracing key" used to tag and identify
47 /// the mpsc channels owned by the object implementing this trait.
48 const TRACING_KEY: &'static str;
49}
50
51/// The receiving half of the notifications channel.
52///
53/// The [`NotificationStream`] entity stores the [`Hub`] so it can be
54/// used to add more subscriptions.
55#[derive(Clone)]
56pub struct NotificationStream<Payload, TK: TracingKeyStr> {
57 hub: Hub<Payload, Registry>,
58 _pd: std::marker::PhantomData<TK>,
59}
60
61/// The receiving half of the notifications channel(s).
62#[derive(Debug)]
63pub struct NotificationReceiver<Payload> {
64 receiver: Receiver<Payload, Registry>,
65}
66
67/// The sending half of the notifications channel(s).
68pub struct NotificationSender<Payload> {
69 hub: Hub<Payload, Registry>,
70}
71
72impl<Payload, TK: TracingKeyStr> NotificationStream<Payload, TK> {
73 /// Creates a new pair of receiver and sender of `Payload` notifications.
74 pub fn channel() -> (NotificationSender<Payload>, Self) {
75 let hub = Hub::new(TK::TRACING_KEY);
76 let sender = NotificationSender { hub: hub.clone() };
77 let receiver = NotificationStream { hub, _pd: Default::default() };
78 (sender, receiver)
79 }
80
81 /// Subscribe to a channel through which the generic payload can be received.
82 pub fn subscribe(&self, queue_size_warning: usize) -> NotificationReceiver<Payload> {
83 let receiver = self.hub.subscribe((), queue_size_warning);
84 NotificationReceiver { receiver }
85 }
86}
87
88impl<Payload> NotificationSender<Payload> {
89 /// Send out a notification to all subscribers that a new payload is available for a
90 /// block.
91 pub fn notify<Error>(
92 &self,
93 payload: impl FnOnce() -> Result<Payload, Error>,
94 ) -> Result<(), Error>
95 where
96 Payload: Clone,
97 {
98 self.hub.send(payload)
99 }
100}
101
102impl<Payload> Clone for NotificationSender<Payload> {
103 fn clone(&self) -> Self {
104 Self { hub: self.hub.clone() }
105 }
106}
107
108impl<Payload> Unpin for NotificationReceiver<Payload> {}
109
110impl<Payload> Stream for NotificationReceiver<Payload> {
111 type Item = Payload;
112
113 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Payload>> {
114 Pin::new(&mut self.get_mut().receiver).poll_next(cx)
115 }
116}
117
118impl<Payload> FusedStream for NotificationReceiver<Payload> {
119 fn is_terminated(&self) -> bool {
120 self.receiver.is_terminated()
121 }
122}