1use crate::{
22 error,
23 protocol::notifications::handler::NotificationsSink,
24 service::{
25 metrics::NotificationMetrics,
26 traits::{
27 Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
28 },
29 },
30 types::ProtocolName,
31};
32
33use futures::{
34 stream::{FuturesUnordered, Stream},
35 StreamExt,
36};
37use libp2p::PeerId;
38use parking_lot::Mutex;
39use tokio::sync::{mpsc, oneshot};
40use tokio_stream::wrappers::ReceiverStream;
41
42use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
43
44use std::{collections::HashMap, fmt::Debug, sync::Arc};
45
46pub(crate) mod metrics;
47
48#[cfg(test)]
49mod tests;
50
51const LOG_TARGET: &str = "sub-libp2p";
53
54const COMMAND_QUEUE_SIZE: usize = 64;
56
57type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
59
60type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
65
66#[async_trait::async_trait]
67impl MessageSink for NotificationSink {
68 fn send_sync_notification(&self, notification: Vec<u8>) {
70 let sink = self.lock();
71
72 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
73 sink.0.send_sync_notification(notification);
74 }
75
76 async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
81 let notification_len = notification.len();
85 let sink = self.lock().clone();
86 let permit = sink
87 .0
88 .reserve_notification()
89 .await
90 .map_err(|_| error::Error::ConnectionClosed)?;
91
92 permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
93 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
94 })
95 }
96}
97
98#[derive(Debug)]
101enum InnerNotificationEvent {
102 ValidateInboundSubstream {
104 peer: PeerId,
106
107 handshake: Vec<u8>,
109
110 result_tx: oneshot::Sender<ValidationResult>,
112 },
113
114 NotificationStreamOpened {
116 peer: PeerId,
118
119 direction: Direction,
121
122 handshake: Vec<u8>,
124
125 negotiated_fallback: Option<ProtocolName>,
127
128 sink: NotificationsSink,
130 },
131
132 NotificationStreamClosed {
134 peer: PeerId,
136 },
137
138 NotificationReceived {
140 peer: PeerId,
142
143 notification: Vec<u8>,
145 },
146
147 NotificationSinkReplaced {
149 peer: PeerId,
151
152 sink: NotificationsSink,
154 },
155}
156
157#[derive(Debug)]
161pub enum NotificationCommand {
162 #[allow(unused)]
164 OpenSubstream(PeerId),
165
166 #[allow(unused)]
168 CloseSubstream(PeerId),
169
170 SetHandshake(Vec<u8>),
172}
173
174#[derive(Debug, Clone)]
183struct PeerContext {
184 sink: NotificationsSink,
186
187 shared_sink: NotificationSink,
189}
190
191#[derive(Debug)]
193pub struct NotificationHandle {
194 protocol: ProtocolName,
196
197 tx: mpsc::Sender<NotificationCommand>,
199
200 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
202
203 subscribers: Subscribers,
205
206 peers: HashMap<PeerId, PeerContext>,
208}
209
210impl NotificationHandle {
211 fn new(
213 protocol: ProtocolName,
214 tx: mpsc::Sender<NotificationCommand>,
215 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
216 subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
217 ) -> Self {
218 Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
219 }
220}
221
222#[async_trait::async_trait]
223impl NotificationService for NotificationHandle {
224 async fn open_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
226 todo!("support for opening substreams not implemented yet");
227 }
228
229 async fn close_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
231 todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
232 }
233
234 fn send_sync_notification(&mut self, peer: &sc_network_types::PeerId, notification: Vec<u8>) {
236 if let Some(info) = self.peers.get(&((*peer).into())) {
237 metrics::register_notification_sent(
238 info.sink.metrics(),
239 &self.protocol,
240 notification.len(),
241 );
242
243 let _ = info.sink.send_sync_notification(notification);
244 }
245 }
246
247 async fn send_async_notification(
249 &mut self,
250 peer: &sc_network_types::PeerId,
251 notification: Vec<u8>,
252 ) -> Result<(), error::Error> {
253 let notification_len = notification.len();
254 let sink = &self
255 .peers
256 .get(&peer.into())
257 .ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
258 .sink;
259
260 sink.reserve_notification()
261 .await
262 .map_err(|_| error::Error::ConnectionClosed)?
263 .send(notification)
264 .map_err(|_| error::Error::ChannelClosed)
265 .inspect(|_| {
266 metrics::register_notification_sent(
267 sink.metrics(),
268 &self.protocol,
269 notification_len,
270 );
271 })
272 }
273
274 async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
276 log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
277
278 self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
279 }
280
281 fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
287 self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
288 }
289
290 async fn next_event(&mut self) -> Option<NotificationEvent> {
292 loop {
293 match self.rx.next().await? {
294 InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } =>
295 return Some(NotificationEvent::ValidateInboundSubstream {
296 peer: peer.into(),
297 handshake,
298 result_tx,
299 }),
300 InnerNotificationEvent::NotificationStreamOpened {
301 peer,
302 handshake,
303 negotiated_fallback,
304 direction,
305 sink,
306 } => {
307 self.peers.insert(
308 peer,
309 PeerContext {
310 sink: sink.clone(),
311 shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
312 },
313 );
314 return Some(NotificationEvent::NotificationStreamOpened {
315 peer: peer.into(),
316 handshake,
317 direction,
318 negotiated_fallback,
319 })
320 },
321 InnerNotificationEvent::NotificationStreamClosed { peer } => {
322 self.peers.remove(&peer);
323 return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() })
324 },
325 InnerNotificationEvent::NotificationReceived { peer, notification } =>
326 return Some(NotificationEvent::NotificationReceived {
327 peer: peer.into(),
328 notification,
329 }),
330 InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
331 match self.peers.get_mut(&peer) {
332 None => log::error!(
333 "{}: notification sink replaced for {peer} but peer does not exist",
334 self.protocol
335 ),
336 Some(context) => {
337 context.sink = sink.clone();
338 *context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
339 },
340 }
341 },
342 }
343 }
344 }
345
346 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
348 let mut subscribers = self.subscribers.lock();
349
350 let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
351 subscribers.push(event_tx);
352
353 Ok(Box::new(NotificationHandle {
354 protocol: self.protocol.clone(),
355 tx: self.tx.clone(),
356 rx: event_rx,
357 peers: self.peers.clone(),
358 subscribers: self.subscribers.clone(),
359 }))
360 }
361
362 fn protocol(&self) -> &ProtocolName {
364 &self.protocol
365 }
366
367 fn message_sink(&self, peer: &sc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
369 match self.peers.get(&peer.into()) {
370 Some(context) => Some(Box::new(context.shared_sink.clone())),
371 None => None,
372 }
373 }
374}
375
376#[derive(Debug)]
378pub struct ProtocolHandlePair {
379 protocol: ProtocolName,
381
382 subscribers: Subscribers,
384
385 rx: mpsc::Receiver<NotificationCommand>,
387}
388
389impl ProtocolHandlePair {
390 fn new(
392 protocol: ProtocolName,
393 subscribers: Subscribers,
394 rx: mpsc::Receiver<NotificationCommand>,
395 ) -> Self {
396 Self { protocol, subscribers, rx }
397 }
398
399 pub(crate) fn split(
402 self,
403 ) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
404 (
405 ProtocolHandle::new(self.protocol, self.subscribers),
406 Box::new(ReceiverStream::new(self.rx)),
407 )
408 }
409}
410
411#[derive(Debug, Clone)]
414pub(crate) struct ProtocolHandle {
415 protocol: ProtocolName,
417
418 subscribers: Subscribers,
420
421 num_peers: usize,
423
424 delegate_to_peerset: bool,
426
427 metrics: Option<NotificationMetrics>,
429}
430
431pub(crate) enum ValidationCallResult {
432 WaitForValidation(oneshot::Receiver<ValidationResult>),
433 Delegated,
434}
435
436impl ProtocolHandle {
437 fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
439 Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
440 }
441
442 pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
444 self.metrics = Some(metrics);
445 }
446
447 pub fn delegate_to_peerset(&mut self, delegate: bool) {
453 self.delegate_to_peerset = delegate;
454 }
455
456 pub fn report_incoming_substream(
462 &self,
463 peer: PeerId,
464 handshake: Vec<u8>,
465 ) -> Result<ValidationCallResult, ()> {
466 let subscribers = self.subscribers.lock();
467
468 log::trace!(
469 target: LOG_TARGET,
470 "{}: report incoming substream for {peer}, handshake {handshake:?}",
471 self.protocol
472 );
473
474 if self.delegate_to_peerset {
475 return Ok(ValidationCallResult::Delegated)
476 }
477
478 if subscribers.len() == 1 {
481 let (result_tx, rx) = oneshot::channel();
482 return subscribers[0]
483 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
484 peer,
485 handshake,
486 result_tx,
487 })
488 .map(|_| ValidationCallResult::WaitForValidation(rx))
489 .map_err(|_| ())
490 }
491
492 let mut results: FuturesUnordered<_> = subscribers
495 .iter()
496 .filter_map(|subscriber| {
497 let (result_tx, rx) = oneshot::channel();
498
499 subscriber
500 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
501 peer,
502 handshake: handshake.clone(),
503 result_tx,
504 })
505 .is_ok()
506 .then_some(rx)
507 })
508 .collect();
509
510 let (tx, rx) = oneshot::channel();
511 tokio::spawn(async move {
512 while let Some(event) = results.next().await {
513 match event {
514 Err(_) | Ok(ValidationResult::Reject) =>
515 return tx.send(ValidationResult::Reject),
516 Ok(ValidationResult::Accept) => {},
517 }
518 }
519
520 return tx.send(ValidationResult::Accept)
521 });
522
523 Ok(ValidationCallResult::WaitForValidation(rx))
524 }
525
526 pub fn report_substream_opened(
529 &mut self,
530 peer: PeerId,
531 direction: Direction,
532 handshake: Vec<u8>,
533 negotiated_fallback: Option<ProtocolName>,
534 sink: NotificationsSink,
535 ) -> Result<(), ()> {
536 metrics::register_substream_opened(&self.metrics, &self.protocol);
537
538 let mut subscribers = self.subscribers.lock();
539 log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
540
541 subscribers.retain(|subscriber| {
542 subscriber
543 .unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
544 peer,
545 direction,
546 handshake: handshake.clone(),
547 negotiated_fallback: negotiated_fallback.clone(),
548 sink: sink.clone(),
549 })
550 .is_ok()
551 });
552 self.num_peers += 1;
553
554 Ok(())
555 }
556
557 pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
559 metrics::register_substream_closed(&self.metrics, &self.protocol);
560
561 let mut subscribers = self.subscribers.lock();
562 log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
563
564 subscribers.retain(|subscriber| {
565 subscriber
566 .unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
567 .is_ok()
568 });
569 self.num_peers -= 1;
570
571 Ok(())
572 }
573
574 pub fn report_notification_received(
576 &mut self,
577 peer: PeerId,
578 notification: Vec<u8>,
579 ) -> Result<(), ()> {
580 metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
581
582 let mut subscribers = self.subscribers.lock();
583 log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
584
585 subscribers.retain(|subscriber| {
586 subscriber
587 .unbounded_send(InnerNotificationEvent::NotificationReceived {
588 peer,
589 notification: notification.clone(),
590 })
591 .is_ok()
592 });
593
594 Ok(())
595 }
596
597 pub fn report_notification_sink_replaced(
599 &mut self,
600 peer: PeerId,
601 sink: NotificationsSink,
602 ) -> Result<(), ()> {
603 let mut subscribers = self.subscribers.lock();
604
605 log::trace!(
606 target: LOG_TARGET,
607 "{}: notification sink replaced for {peer:?}",
608 self.protocol
609 );
610
611 subscribers.retain(|subscriber| {
612 subscriber
613 .unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
614 peer,
615 sink: sink.clone(),
616 })
617 .is_ok()
618 });
619
620 Ok(())
621 }
622
623 pub fn num_peers(&self) -> usize {
625 self.num_peers
626 }
627}
628
629pub fn notification_service(
633 protocol: ProtocolName,
634) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
635 let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
636
637 let (event_tx, event_rx) =
638 tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
639 let subscribers = Arc::new(Mutex::new(vec![event_tx]));
640
641 (
642 ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
643 Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
644 )
645}
646
647fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
650 let protocol_name = protocol.to_string();
651 let keys = protocol_name.split("/").collect::<Vec<_>>();
652 keys.iter()
653 .rev()
654 .take(2) .fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
656}