sc_network/protocol/notifications/service/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Notification service implementation.
20
21use crate::{
22	error,
23	protocol::notifications::handler::NotificationsSink,
24	service::{
25		metrics::NotificationMetrics,
26		traits::{
27			Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
28		},
29	},
30	types::ProtocolName,
31};
32
33use futures::{
34	stream::{FuturesUnordered, Stream},
35	StreamExt,
36};
37use libp2p::PeerId;
38use parking_lot::Mutex;
39use tokio::sync::{mpsc, oneshot};
40use tokio_stream::wrappers::ReceiverStream;
41
42use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
43
44use std::{collections::HashMap, fmt::Debug, sync::Arc};
45
46pub(crate) mod metrics;
47
48#[cfg(test)]
49mod tests;
50
51/// Logging target for the file.
52const LOG_TARGET: &str = "sub-libp2p";
53
54/// Default command queue size.
55const COMMAND_QUEUE_SIZE: usize = 64;
56
57/// Type representing subscribers of a notification protocol.
58type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
59
60/// Type representing a distributable message sink.
61/// Detached message sink must carry the protocol name for registering metrics.
62///
63/// See documentation for [`PeerContext`] for more details.
64type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
65
66#[async_trait::async_trait]
67impl MessageSink for NotificationSink {
68	/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
69	fn send_sync_notification(&self, notification: Vec<u8>) {
70		let sink = self.lock();
71
72		metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
73		sink.0.send_sync_notification(notification);
74	}
75
76	/// Send an asynchronous `notification` to the peer associated with this [`MessageSink`],
77	/// allowing sender to exercise backpressure.
78	///
79	/// Returns an error if the peer does not exist.
80	async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
81		// notification sink must be cloned because the lock cannot be held across `.await`
82		// this makes the implementation less efficient but not prohibitively so as the same
83		// method is also used by `NetworkService` when sending notifications.
84		let notification_len = notification.len();
85		let sink = self.lock().clone();
86		let permit = sink
87			.0
88			.reserve_notification()
89			.await
90			.map_err(|_| error::Error::ConnectionClosed)?;
91
92		permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
93			metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
94		})
95	}
96}
97
98/// Inner notification event to deal with `NotificationsSinks` without exposing that
99/// implementation detail to [`NotificationService`] consumers.
100#[derive(Debug)]
101enum InnerNotificationEvent {
102	/// Validate inbound substream.
103	ValidateInboundSubstream {
104		/// Peer ID.
105		peer: PeerId,
106
107		/// Received handshake.
108		handshake: Vec<u8>,
109
110		/// `oneshot::Sender` for sending validation result back to `Notifications`
111		result_tx: oneshot::Sender<ValidationResult>,
112	},
113
114	/// Notification substream open to `peer`.
115	NotificationStreamOpened {
116		/// Peer ID.
117		peer: PeerId,
118
119		/// Direction of the substream.
120		direction: Direction,
121
122		/// Received handshake.
123		handshake: Vec<u8>,
124
125		/// Negotiated fallback.
126		negotiated_fallback: Option<ProtocolName>,
127
128		/// Notification sink.
129		sink: NotificationsSink,
130	},
131
132	/// Substream was closed.
133	NotificationStreamClosed {
134		/// Peer ID.
135		peer: PeerId,
136	},
137
138	/// Notification was received from the substream.
139	NotificationReceived {
140		/// Peer ID.
141		peer: PeerId,
142
143		/// Received notification.
144		notification: Vec<u8>,
145	},
146
147	/// Notification sink has been replaced.
148	NotificationSinkReplaced {
149		/// Peer ID.
150		peer: PeerId,
151
152		/// Notification sink.
153		sink: NotificationsSink,
154	},
155}
156
157/// Notification commands.
158///
159/// Sent by the installed protocols to `Notifications` to open/close/modify substreams.
160#[derive(Debug)]
161pub enum NotificationCommand {
162	/// Instruct `Notifications` to open a substream to peer.
163	#[allow(unused)]
164	OpenSubstream(PeerId),
165
166	/// Instruct `Notifications` to close the substream to peer.
167	#[allow(unused)]
168	CloseSubstream(PeerId),
169
170	/// Set handshake for the notifications protocol.
171	SetHandshake(Vec<u8>),
172}
173
174/// Context assigned to each peer.
175///
176/// Contains `NotificationsSink` used by [`NotificationService`] to send notifications
177/// and an additional, distributable `NotificationsSink` which the protocol may acquire
178/// if it wishes to send notifications through `NotificationsSink` directly.
179///
180/// The distributable `NotificationsSink` is wrapped in an `Arc<Mutex<>>` to allow
181/// `NotificationsService` to swap the underlying sink in case it's replaced.
182#[derive(Debug, Clone)]
183struct PeerContext {
184	/// Sink for sending notifications.
185	sink: NotificationsSink,
186
187	/// Distributable notification sink.
188	shared_sink: NotificationSink,
189}
190
191/// Handle that is passed on to the notifications protocol.
192#[derive(Debug)]
193pub struct NotificationHandle {
194	/// Protocol name.
195	protocol: ProtocolName,
196
197	/// TX channel for sending commands to `Notifications`.
198	tx: mpsc::Sender<NotificationCommand>,
199
200	/// RX channel for receiving events from `Notifications`.
201	rx: TracingUnboundedReceiver<InnerNotificationEvent>,
202
203	/// All subscribers of `NotificationEvent`s.
204	subscribers: Subscribers,
205
206	/// Connected peers.
207	peers: HashMap<PeerId, PeerContext>,
208}
209
210impl NotificationHandle {
211	/// Create new [`NotificationHandle`].
212	fn new(
213		protocol: ProtocolName,
214		tx: mpsc::Sender<NotificationCommand>,
215		rx: TracingUnboundedReceiver<InnerNotificationEvent>,
216		subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
217	) -> Self {
218		Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
219	}
220}
221
222#[async_trait::async_trait]
223impl NotificationService for NotificationHandle {
224	/// Instruct `Notifications` to open a new substream for `peer`.
225	async fn open_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
226		todo!("support for opening substreams not implemented yet");
227	}
228
229	/// Instruct `Notifications` to close substream for `peer`.
230	async fn close_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
231		todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
232	}
233
234	/// Send synchronous `notification` to `peer`.
235	fn send_sync_notification(&mut self, peer: &sc_network_types::PeerId, notification: Vec<u8>) {
236		if let Some(info) = self.peers.get(&((*peer).into())) {
237			metrics::register_notification_sent(
238				info.sink.metrics(),
239				&self.protocol,
240				notification.len(),
241			);
242
243			let _ = info.sink.send_sync_notification(notification);
244		}
245	}
246
247	/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
248	async fn send_async_notification(
249		&mut self,
250		peer: &sc_network_types::PeerId,
251		notification: Vec<u8>,
252	) -> Result<(), error::Error> {
253		let notification_len = notification.len();
254		let sink = &self
255			.peers
256			.get(&peer.into())
257			.ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
258			.sink;
259
260		sink.reserve_notification()
261			.await
262			.map_err(|_| error::Error::ConnectionClosed)?
263			.send(notification)
264			.map_err(|_| error::Error::ChannelClosed)
265			.inspect(|_| {
266				metrics::register_notification_sent(
267					sink.metrics(),
268					&self.protocol,
269					notification_len,
270				);
271			})
272	}
273
274	/// Set handshake for the notification protocol replacing the old handshake.
275	async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
276		log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
277
278		self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
279	}
280
281	/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
282	/// and returns an error if the channel is blocked.
283	///
284	/// Technically the function can return an error if the channel to `Notifications` is closed
285	/// but that doesn't happen under normal operation.
286	fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
287		self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
288	}
289
290	/// Get next event from the `Notifications` event stream.
291	async fn next_event(&mut self) -> Option<NotificationEvent> {
292		loop {
293			match self.rx.next().await? {
294				InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } =>
295					return Some(NotificationEvent::ValidateInboundSubstream {
296						peer: peer.into(),
297						handshake,
298						result_tx,
299					}),
300				InnerNotificationEvent::NotificationStreamOpened {
301					peer,
302					handshake,
303					negotiated_fallback,
304					direction,
305					sink,
306				} => {
307					self.peers.insert(
308						peer,
309						PeerContext {
310							sink: sink.clone(),
311							shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
312						},
313					);
314					return Some(NotificationEvent::NotificationStreamOpened {
315						peer: peer.into(),
316						handshake,
317						direction,
318						negotiated_fallback,
319					})
320				},
321				InnerNotificationEvent::NotificationStreamClosed { peer } => {
322					self.peers.remove(&peer);
323					return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() })
324				},
325				InnerNotificationEvent::NotificationReceived { peer, notification } =>
326					return Some(NotificationEvent::NotificationReceived {
327						peer: peer.into(),
328						notification,
329					}),
330				InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
331					match self.peers.get_mut(&peer) {
332						None => log::error!(
333							"{}: notification sink replaced for {peer} but peer does not exist",
334							self.protocol
335						),
336						Some(context) => {
337							context.sink = sink.clone();
338							*context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
339						},
340					}
341				},
342			}
343		}
344	}
345
346	// Clone [`NotificationService`]
347	fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
348		let mut subscribers = self.subscribers.lock();
349
350		let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
351		subscribers.push(event_tx);
352
353		Ok(Box::new(NotificationHandle {
354			protocol: self.protocol.clone(),
355			tx: self.tx.clone(),
356			rx: event_rx,
357			peers: self.peers.clone(),
358			subscribers: self.subscribers.clone(),
359		}))
360	}
361
362	/// Get protocol name.
363	fn protocol(&self) -> &ProtocolName {
364		&self.protocol
365	}
366
367	/// Get message sink of the peer.
368	fn message_sink(&self, peer: &sc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
369		match self.peers.get(&peer.into()) {
370			Some(context) => Some(Box::new(context.shared_sink.clone())),
371			None => None,
372		}
373	}
374}
375
376/// Channel pair which allows `Notifications` to interact with a protocol.
377#[derive(Debug)]
378pub struct ProtocolHandlePair {
379	/// Protocol name.
380	protocol: ProtocolName,
381
382	/// Subscribers of the notification protocol events.
383	subscribers: Subscribers,
384
385	// Receiver for notification commands received from the protocol implementation.
386	rx: mpsc::Receiver<NotificationCommand>,
387}
388
389impl ProtocolHandlePair {
390	/// Create new [`ProtocolHandlePair`].
391	fn new(
392		protocol: ProtocolName,
393		subscribers: Subscribers,
394		rx: mpsc::Receiver<NotificationCommand>,
395	) -> Self {
396		Self { protocol, subscribers, rx }
397	}
398
399	/// Consume `self` and split [`ProtocolHandlePair`] into a handle which allows it to send events
400	/// to the protocol and a stream of commands received from the protocol.
401	pub(crate) fn split(
402		self,
403	) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
404		(
405			ProtocolHandle::new(self.protocol, self.subscribers),
406			Box::new(ReceiverStream::new(self.rx)),
407		)
408	}
409}
410
411/// Handle that is passed on to `Notifications` and allows it to directly communicate
412/// with the protocol.
413#[derive(Debug, Clone)]
414pub(crate) struct ProtocolHandle {
415	/// Protocol name.
416	protocol: ProtocolName,
417
418	/// Subscribers of the notification protocol.
419	subscribers: Subscribers,
420
421	/// Number of connected peers.
422	num_peers: usize,
423
424	/// Delegate validation to `Peerset`.
425	delegate_to_peerset: bool,
426
427	/// Prometheus metrics.
428	metrics: Option<NotificationMetrics>,
429}
430
431pub(crate) enum ValidationCallResult {
432	WaitForValidation(oneshot::Receiver<ValidationResult>),
433	Delegated,
434}
435
436impl ProtocolHandle {
437	/// Create new [`ProtocolHandle`].
438	fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
439		Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
440	}
441
442	/// Set metrics.
443	pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
444		self.metrics = Some(metrics);
445	}
446
447	/// Delegate validation to `Peerset`.
448	///
449	/// Protocols that do not do any validation themselves and only rely on `Peerset` handling
450	/// validation can disable protocol-side validation entirely by delegating all validation to
451	/// `Peerset`.
452	pub fn delegate_to_peerset(&mut self, delegate: bool) {
453		self.delegate_to_peerset = delegate;
454	}
455
456	/// Report to the protocol that a substream has been opened and it must be validated by the
457	/// protocol.
458	///
459	/// Return `oneshot::Receiver` which allows `Notifications` to poll for the validation result
460	/// from protocol.
461	pub fn report_incoming_substream(
462		&self,
463		peer: PeerId,
464		handshake: Vec<u8>,
465	) -> Result<ValidationCallResult, ()> {
466		let subscribers = self.subscribers.lock();
467
468		log::trace!(
469			target: LOG_TARGET,
470			"{}: report incoming substream for {peer}, handshake {handshake:?}",
471			self.protocol
472		);
473
474		if self.delegate_to_peerset {
475			return Ok(ValidationCallResult::Delegated)
476		}
477
478		// if there is only one subscriber, `Notifications` can wait directly on the
479		// `oneshot::channel()`'s RX half without indirection
480		if subscribers.len() == 1 {
481			let (result_tx, rx) = oneshot::channel();
482			return subscribers[0]
483				.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
484					peer,
485					handshake,
486					result_tx,
487				})
488				.map(|_| ValidationCallResult::WaitForValidation(rx))
489				.map_err(|_| ())
490		}
491
492		// if there are multiple subscribers, create a task which waits for all of the
493		// validations to finish and returns the combined result to `Notifications`
494		let mut results: FuturesUnordered<_> = subscribers
495			.iter()
496			.filter_map(|subscriber| {
497				let (result_tx, rx) = oneshot::channel();
498
499				subscriber
500					.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
501						peer,
502						handshake: handshake.clone(),
503						result_tx,
504					})
505					.is_ok()
506					.then_some(rx)
507			})
508			.collect();
509
510		let (tx, rx) = oneshot::channel();
511		tokio::spawn(async move {
512			while let Some(event) = results.next().await {
513				match event {
514					Err(_) | Ok(ValidationResult::Reject) =>
515						return tx.send(ValidationResult::Reject),
516					Ok(ValidationResult::Accept) => {},
517				}
518			}
519
520			return tx.send(ValidationResult::Accept)
521		});
522
523		Ok(ValidationCallResult::WaitForValidation(rx))
524	}
525
526	/// Report to the protocol that a substream has been opened and that it can now use the handle
527	/// to send notifications to the remote peer.
528	pub fn report_substream_opened(
529		&mut self,
530		peer: PeerId,
531		direction: Direction,
532		handshake: Vec<u8>,
533		negotiated_fallback: Option<ProtocolName>,
534		sink: NotificationsSink,
535	) -> Result<(), ()> {
536		metrics::register_substream_opened(&self.metrics, &self.protocol);
537
538		let mut subscribers = self.subscribers.lock();
539		log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
540
541		subscribers.retain(|subscriber| {
542			subscriber
543				.unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
544					peer,
545					direction,
546					handshake: handshake.clone(),
547					negotiated_fallback: negotiated_fallback.clone(),
548					sink: sink.clone(),
549				})
550				.is_ok()
551		});
552		self.num_peers += 1;
553
554		Ok(())
555	}
556
557	/// Substream was closed.
558	pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
559		metrics::register_substream_closed(&self.metrics, &self.protocol);
560
561		let mut subscribers = self.subscribers.lock();
562		log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
563
564		subscribers.retain(|subscriber| {
565			subscriber
566				.unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
567				.is_ok()
568		});
569		self.num_peers -= 1;
570
571		Ok(())
572	}
573
574	/// Notification was received from the substream.
575	pub fn report_notification_received(
576		&mut self,
577		peer: PeerId,
578		notification: Vec<u8>,
579	) -> Result<(), ()> {
580		metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
581
582		let mut subscribers = self.subscribers.lock();
583		log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
584
585		subscribers.retain(|subscriber| {
586			subscriber
587				.unbounded_send(InnerNotificationEvent::NotificationReceived {
588					peer,
589					notification: notification.clone(),
590				})
591				.is_ok()
592		});
593
594		Ok(())
595	}
596
597	/// Notification sink was replaced.
598	pub fn report_notification_sink_replaced(
599		&mut self,
600		peer: PeerId,
601		sink: NotificationsSink,
602	) -> Result<(), ()> {
603		let mut subscribers = self.subscribers.lock();
604
605		log::trace!(
606			target: LOG_TARGET,
607			"{}: notification sink replaced for {peer:?}",
608			self.protocol
609		);
610
611		subscribers.retain(|subscriber| {
612			subscriber
613				.unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
614					peer,
615					sink: sink.clone(),
616				})
617				.is_ok()
618		});
619
620		Ok(())
621	}
622
623	/// Get the number of connected peers.
624	pub fn num_peers(&self) -> usize {
625		self.num_peers
626	}
627}
628
629/// Create new (protocol, notification) handle pair.
630///
631/// Handle pair allows `Notifications` and the protocol to communicate with each other directly.
632pub fn notification_service(
633	protocol: ProtocolName,
634) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
635	let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
636
637	let (event_tx, event_rx) =
638		tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
639	let subscribers = Arc::new(Mutex::new(vec![event_tx]));
640
641	(
642		ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
643		Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
644	)
645}
646
647// Decorates the mpsc-notification-to-protocol metric with the name of the protocol,
648// to be able to distiguish between different protocols in dashboards.
649fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
650	let protocol_name = protocol.to_string();
651	let keys = protocol_name.split("/").collect::<Vec<_>>();
652	keys.iter()
653		.rev()
654		.take(2) // Last two tokens give the protocol name and version
655		.fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
656}