1use crate::{
31 behaviour::{self, Behaviour, BehaviourOut},
32 bitswap::BitswapRequestHandler,
33 config::{
34 parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35 NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36 },
37 discovery::DiscoveryConfig,
38 error::Error,
39 event::{DhtEvent, Event},
40 network_state::{
41 NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42 },
43 peer_store::{PeerStore, PeerStoreProvider},
44 protocol::{self, NotifsHandlerError, Protocol, Ready},
45 protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46 request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47 service::{
48 signature::{Signature, SigningError},
49 traits::{
50 BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51 NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52 NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53 NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54 },
55 },
56 transport,
57 types::ProtocolName,
58 NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use either::Either;
63use futures::{channel::oneshot, prelude::*};
64#[allow(deprecated)]
65use libp2p::swarm::THandlerErr;
66use libp2p::{
67 connection_limits::{ConnectionLimits, Exceeded},
68 core::{upgrade, ConnectedPoint, Endpoint},
69 identify::Info as IdentifyInfo,
70 identity::ed25519,
71 kad::{record::Key as KademliaKey, Record},
72 multiaddr::{self, Multiaddr},
73 swarm::{
74 Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
75 NetworkBehaviour, Swarm, SwarmEvent,
76 },
77 PeerId,
78};
79use log::{debug, error, info, trace, warn};
80use metrics::{Histogram, MetricSources, Metrics};
81use parking_lot::Mutex;
82use prometheus_endpoint::Registry;
83
84use sc_client_api::BlockBackend;
85use sc_network_common::{
86 role::{ObservedRole, Roles},
87 ExHashT,
88};
89use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
90use sp_runtime::traits::Block as BlockT;
91
92pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
93pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
94pub use metrics::NotificationMetrics;
95pub use protocol::NotificationsSink;
96use std::{
97 cmp,
98 collections::{HashMap, HashSet},
99 fs, iter,
100 marker::PhantomData,
101 num::NonZeroUsize,
102 pin::Pin,
103 str,
104 sync::{
105 atomic::{AtomicUsize, Ordering},
106 Arc,
107 },
108 time::{Duration, Instant},
109};
110
111pub(crate) mod metrics;
112pub(crate) mod out_events;
113
114pub mod signature;
115pub mod traits;
116
117struct Libp2pBandwidthSink {
118 sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122 fn total_inbound(&self) -> u64 {
123 self.sink.total_inbound()
124 }
125
126 fn total_outbound(&self) -> u64 {
127 self.sink.total_outbound()
128 }
129}
130
131pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133 num_connected: Arc<AtomicUsize>,
135 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139 local_peer_id: PeerId,
141 local_identity: Keypair,
143 bandwidth: Arc<dyn BandwidthSink>,
145 to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147 notification_protocol_ids: HashMap<ProtocolName, SetId>,
150 protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153 sync_protocol_handle: protocol_controller::ProtocolHandle,
155 peer_store_handle: Arc<dyn PeerStoreProvider>,
157 _marker: PhantomData<H>,
160 _block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167 B: BlockT + 'static,
168 H: ExHashT,
169{
170 type NotificationProtocolConfig = NonDefaultSetConfig;
171 type RequestResponseProtocolConfig = RequestResponseConfig;
172 type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173 type PeerStore = PeerStore;
174 type BitswapConfig = RequestResponseConfig;
175
176 fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177 where
178 Self: Sized,
179 {
180 NetworkWorker::new(params)
181 }
182
183 fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185 self.service.clone()
186 }
187
188 fn peer_store(
190 bootnodes: Vec<sc_network_types::PeerId>,
191 metrics_registry: Option<Registry>,
192 ) -> Self::PeerStore {
193 PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194 }
195
196 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197 NotificationMetrics::new(registry)
198 }
199
200 fn bitswap_server(
201 client: Arc<dyn BlockBackend<B> + Send + Sync>,
202 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
203 let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
204
205 (Box::pin(async move { handler.run().await }), protocol_config)
206 }
207
208 fn notification_config(
210 protocol_name: ProtocolName,
211 fallback_names: Vec<ProtocolName>,
212 max_notification_size: u64,
213 handshake: Option<NotificationHandshake>,
214 set_config: SetConfig,
215 _metrics: NotificationMetrics,
216 _peerstore_handle: Arc<dyn PeerStoreProvider>,
217 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
218 NonDefaultSetConfig::new(
219 protocol_name,
220 fallback_names,
221 max_notification_size,
222 handshake,
223 set_config,
224 )
225 }
226
227 fn request_response_config(
229 protocol_name: ProtocolName,
230 fallback_names: Vec<ProtocolName>,
231 max_request_size: u64,
232 max_response_size: u64,
233 request_timeout: Duration,
234 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
235 ) -> Self::RequestResponseProtocolConfig {
236 Self::RequestResponseProtocolConfig {
237 name: protocol_name,
238 fallback_names,
239 max_request_size,
240 max_response_size,
241 request_timeout,
242 inbound_queue,
243 }
244 }
245
246 async fn run(mut self) {
248 self.run().await
249 }
250}
251
252impl<B, H> NetworkWorker<B, H>
253where
254 B: BlockT + 'static,
255 H: ExHashT,
256{
257 pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
263 let peer_store_handle = params.network_config.peer_store_handle();
264 let FullNetworkConfiguration {
265 notification_protocols,
266 request_response_protocols,
267 mut network_config,
268 ..
269 } = params.network_config;
270
271 let local_identity = network_config.node_key.clone().into_keypair()?;
273 let local_public = local_identity.public();
274 let local_peer_id = local_public.to_peer_id();
275
276 let local_identity: ed25519::Keypair = local_identity.into();
278 let local_public: ed25519::PublicKey = local_public.into();
279 let local_peer_id: PeerId = local_peer_id.into();
280
281 network_config.boot_nodes = network_config
282 .boot_nodes
283 .into_iter()
284 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
285 .collect();
286 network_config.default_peers_set.reserved_nodes = network_config
287 .default_peers_set
288 .reserved_nodes
289 .into_iter()
290 .filter(|reserved_node| {
291 if reserved_node.peer_id == local_peer_id.into() {
292 warn!(
293 target: "sub-libp2p",
294 "Local peer ID used in reserved node, ignoring: {}",
295 reserved_node,
296 );
297 false
298 } else {
299 true
300 }
301 })
302 .collect();
303
304 ensure_addresses_consistent_with_transport(
306 network_config.listen_addresses.iter(),
307 &network_config.transport,
308 )?;
309 ensure_addresses_consistent_with_transport(
310 network_config.boot_nodes.iter().map(|x| &x.multiaddr),
311 &network_config.transport,
312 )?;
313 ensure_addresses_consistent_with_transport(
314 network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
315 &network_config.transport,
316 )?;
317 for notification_protocol in ¬ification_protocols {
318 ensure_addresses_consistent_with_transport(
319 notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
320 &network_config.transport,
321 )?;
322 }
323 ensure_addresses_consistent_with_transport(
324 network_config.public_addresses.iter(),
325 &network_config.transport,
326 )?;
327
328 let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
329
330 if let Some(path) = &network_config.net_config_path {
331 fs::create_dir_all(path)?;
332 }
333
334 info!(
335 target: "sub-libp2p",
336 "🏷 Local node identity is: {}",
337 local_peer_id.to_base58(),
338 );
339 log::info!(target: "sub-libp2p", "Running libp2p network backend");
340
341 let (transport, bandwidth) = {
342 let config_mem = match network_config.transport {
343 TransportConfig::MemoryOnly => true,
344 TransportConfig::Normal { .. } => false,
345 };
346
347 let yamux_maximum_buffer_size = {
354 let requests_max = request_response_protocols
355 .iter()
356 .map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
357 let responses_max = request_response_protocols
358 .iter()
359 .map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
360 let notifs_max = notification_protocols
361 .iter()
362 .map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX));
363
364 let default_max = cmp::max(
367 1024 * 1024,
368 usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
369 .unwrap_or(usize::MAX),
370 );
371
372 iter::once(default_max)
373 .chain(requests_max)
374 .chain(responses_max)
375 .chain(notifs_max)
376 .max()
377 .expect("iterator known to always yield at least one element; qed")
378 .saturating_add(10)
379 };
380
381 transport::build_transport(
382 local_identity.clone().into(),
383 config_mem,
384 network_config.yamux_window_size,
385 yamux_maximum_buffer_size,
386 )
387 };
388
389 let (to_notifications, from_protocol_controllers) =
390 tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
391
392 let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
394 .chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
395
396 let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
397 .enumerate()
398 .map(|(set_id, set_config)| {
399 let proto_set_config = ProtoSetConfig {
400 in_peers: set_config.in_peers,
401 out_peers: set_config.out_peers,
402 reserved_nodes: set_config
403 .reserved_nodes
404 .iter()
405 .map(|node| node.peer_id.into())
406 .collect(),
407 reserved_only: set_config.non_reserved_mode.is_reserved_only(),
408 };
409
410 ProtocolController::new(
411 SetId::from(set_id),
412 proto_set_config,
413 to_notifications.clone(),
414 Arc::clone(&peer_store_handle),
415 )
416 })
417 .unzip();
418
419 let sync_protocol_handle = protocol_handles[0].clone();
421
422 protocol_controllers
424 .into_iter()
425 .for_each(|controller| (params.executor)(controller.run().boxed()));
426
427 let notification_protocol_ids: HashMap<ProtocolName, SetId> =
430 iter::once(¶ms.block_announce_config)
431 .chain(notification_protocols.iter())
432 .enumerate()
433 .map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
434 .collect();
435
436 let known_addresses = {
437 let mut addresses: Vec<_> = network_config
439 .default_peers_set
440 .reserved_nodes
441 .iter()
442 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
443 .chain(notification_protocols.iter().flat_map(|protocol| {
444 protocol
445 .set_config()
446 .reserved_nodes
447 .iter()
448 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
449 }))
450 .chain(
451 network_config
452 .boot_nodes
453 .iter()
454 .map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
455 )
456 .collect();
457
458 addresses.sort();
460 addresses.dedup();
461
462 addresses
463 };
464
465 network_config.boot_nodes.iter().try_for_each(|bootnode| {
467 if let Some(other) = network_config
468 .boot_nodes
469 .iter()
470 .filter(|o| o.multiaddr == bootnode.multiaddr)
471 .find(|o| o.peer_id != bootnode.peer_id)
472 {
473 Err(Error::DuplicateBootnode {
474 address: bootnode.multiaddr.clone().into(),
475 first_id: bootnode.peer_id.into(),
476 second_id: other.peer_id.into(),
477 })
478 } else {
479 Ok(())
480 }
481 })?;
482
483 let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
485
486 for bootnode in network_config.boot_nodes.iter() {
487 boot_node_ids
488 .entry(bootnode.peer_id.into())
489 .or_default()
490 .push(bootnode.multiaddr.clone().into());
491 }
492
493 let boot_node_ids = Arc::new(boot_node_ids);
494
495 let num_connected = Arc::new(AtomicUsize::new(0));
496 let external_addresses = Arc::new(Mutex::new(HashSet::new()));
497
498 let (protocol, notif_protocol_handles) = Protocol::new(
499 From::from(¶ms.role),
500 params.notification_metrics,
501 notification_protocols,
502 params.block_announce_config,
503 Arc::clone(&peer_store_handle),
504 protocol_handles.clone(),
505 from_protocol_controllers,
506 )?;
507
508 let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
510 let user_agent =
511 format!("{} ({})", network_config.client_version, network_config.node_name);
512
513 let discovery_config = {
514 let mut config = DiscoveryConfig::new(local_peer_id);
515 config.with_permanent_addresses(
516 known_addresses
517 .iter()
518 .map(|(peer, address)| (peer.into(), address.clone().into()))
519 .collect::<Vec<_>>(),
520 );
521 config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
522 config.with_kademlia(
523 params.genesis_hash,
524 params.fork_id.as_deref(),
525 ¶ms.protocol_id,
526 );
527 config.with_dht_random_walk(network_config.enable_dht_random_walk);
528 config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
529 config.use_kademlia_disjoint_query_paths(
530 network_config.kademlia_disjoint_query_paths,
531 );
532 config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
533
534 match network_config.transport {
535 TransportConfig::MemoryOnly => {
536 config.with_mdns(false);
537 config.allow_private_ip(false);
538 },
539 TransportConfig::Normal {
540 enable_mdns,
541 allow_private_ip: allow_private_ipv4,
542 ..
543 } => {
544 config.with_mdns(enable_mdns);
545 config.allow_private_ip(allow_private_ipv4);
546 },
547 }
548
549 config
550 };
551
552 let behaviour = {
553 let result = Behaviour::new(
554 protocol,
555 user_agent,
556 local_public.into(),
557 discovery_config,
558 request_response_protocols,
559 Arc::clone(&peer_store_handle),
560 external_addresses.clone(),
561 ConnectionLimits::default()
562 .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
563 .with_max_established_incoming(Some(
564 crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
565 )),
566 );
567
568 match result {
569 Ok(b) => b,
570 Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) =>
571 return Err(Error::DuplicateRequestResponseProtocol { protocol: proto }),
572 }
573 };
574
575 let swarm = {
576 struct SpawnImpl<F>(F);
577 impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
578 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
579 (self.0)(f)
580 }
581 }
582
583 let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
584 .with_substream_upgrade_protocol_override(upgrade::Version::V1)
585 .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
586 .with_per_connection_event_buffer_size(24)
589 .with_max_negotiating_inbound_streams(2048)
590 .with_idle_connection_timeout(Duration::from_secs(10));
591
592 Swarm::new(transport, behaviour, local_peer_id, config)
593 };
594
595 (swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
596 };
597
598 let metrics = match ¶ms.metrics_registry {
600 Some(registry) => Some(metrics::register(
601 registry,
602 MetricSources {
603 bandwidth: bandwidth.clone(),
604 connected_peers: num_connected.clone(),
605 },
606 )?),
607 None => None,
608 };
609
610 for addr in &network_config.listen_addresses {
612 if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
613 warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
614 }
615 }
616
617 for addr in &network_config.public_addresses {
619 Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
620 }
621
622 let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
623
624 let service = Arc::new(NetworkService {
625 bandwidth,
626 external_addresses,
627 listen_addresses: listen_addresses_set.clone(),
628 num_connected: num_connected.clone(),
629 local_peer_id,
630 local_identity: local_identity.into(),
631 to_worker,
632 notification_protocol_ids,
633 protocol_handles,
634 sync_protocol_handle,
635 peer_store_handle: Arc::clone(&peer_store_handle),
636 _marker: PhantomData,
637 _block: Default::default(),
638 });
639
640 Ok(NetworkWorker {
641 listen_addresses: listen_addresses_set,
642 num_connected,
643 network_service: swarm,
644 service,
645 from_service,
646 event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
647 metrics,
648 boot_node_ids,
649 reported_invalid_boot_nodes: Default::default(),
650 peer_store_handle: Arc::clone(&peer_store_handle),
651 notif_protocol_handles,
652 _marker: Default::default(),
653 _block: Default::default(),
654 })
655 }
656
657 pub fn status(&self) -> NetworkStatus {
659 NetworkStatus {
660 num_connected_peers: self.num_connected_peers(),
661 total_bytes_inbound: self.total_bytes_inbound(),
662 total_bytes_outbound: self.total_bytes_outbound(),
663 }
664 }
665
666 pub fn total_bytes_inbound(&self) -> u64 {
668 self.service.bandwidth.total_inbound()
669 }
670
671 pub fn total_bytes_outbound(&self) -> u64 {
673 self.service.bandwidth.total_outbound()
674 }
675
676 pub fn num_connected_peers(&self) -> usize {
678 self.network_service.behaviour().user_protocol().num_sync_peers()
679 }
680
681 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
683 self.network_service.behaviour_mut().add_known_address(peer_id, addr);
684 }
685
686 pub fn service(&self) -> &Arc<NetworkService<B, H>> {
689 &self.service
690 }
691
692 pub fn local_peer_id(&self) -> &PeerId {
694 Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
695 }
696
697 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
701 Swarm::<Behaviour<B>>::listeners(&self.network_service)
702 }
703
704 pub fn network_state(&mut self) -> NetworkState {
709 let swarm = &mut self.network_service;
710 let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
711 let connected_peers = {
712 let swarm = &mut *swarm;
713 open.iter()
714 .filter_map(move |peer_id| {
715 let known_addresses = if let Ok(addrs) =
716 NetworkBehaviour::handle_pending_outbound_connection(
717 swarm.behaviour_mut(),
718 ConnectionId::new_unchecked(0), Some(*peer_id),
720 &vec![],
721 Endpoint::Listener,
722 ) {
723 addrs.into_iter().collect()
724 } else {
725 error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id);
726 return None
727 };
728
729 let endpoint = if let Some(e) =
730 swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
731 {
732 e.clone().into()
733 } else {
734 error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
735 and debug information about {:?}", peer_id);
736 return None
737 };
738
739 Some((
740 peer_id.to_base58(),
741 NetworkStatePeer {
742 endpoint,
743 version_string: swarm
744 .behaviour_mut()
745 .node(peer_id)
746 .and_then(|i| i.client_version().map(|s| s.to_owned())),
747 latest_ping_time: swarm
748 .behaviour_mut()
749 .node(peer_id)
750 .and_then(|i| i.latest_ping()),
751 known_addresses,
752 },
753 ))
754 })
755 .collect()
756 };
757
758 let not_connected_peers = {
759 let swarm = &mut *swarm;
760 swarm
761 .behaviour_mut()
762 .known_peers()
763 .into_iter()
764 .filter(|p| open.iter().all(|n| n != p))
765 .map(move |peer_id| {
766 let known_addresses = if let Ok(addrs) =
767 NetworkBehaviour::handle_pending_outbound_connection(
768 swarm.behaviour_mut(),
769 ConnectionId::new_unchecked(0), Some(peer_id),
771 &vec![],
772 Endpoint::Listener,
773 ) {
774 addrs.into_iter().collect()
775 } else {
776 error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id);
777 Default::default()
778 };
779
780 (
781 peer_id.to_base58(),
782 NetworkStateNotConnectedPeer {
783 version_string: swarm
784 .behaviour_mut()
785 .node(&peer_id)
786 .and_then(|i| i.client_version().map(|s| s.to_owned())),
787 latest_ping_time: swarm
788 .behaviour_mut()
789 .node(&peer_id)
790 .and_then(|i| i.latest_ping()),
791 known_addresses,
792 },
793 )
794 })
795 .collect()
796 };
797
798 let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
799 let listened_addresses = swarm.listeners().cloned().collect();
800 let external_addresses = swarm.external_addresses().cloned().collect();
801
802 NetworkState {
803 peer_id,
804 listened_addresses,
805 external_addresses,
806 connected_peers,
807 not_connected_peers,
808 peerset: serde_json::json!(
811 "Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
812 ),
813 }
814 }
815
816 pub fn remove_reserved_peer(&self, peer: PeerId) {
818 self.service.remove_reserved_peer(peer.into());
819 }
820
821 pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
823 self.service.add_reserved_peer(peer)
824 }
825}
826
827impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
828 pub async fn network_state(&self) -> Result<NetworkState, ()> {
835 let (tx, rx) = oneshot::channel();
836
837 let _ = self
838 .to_worker
839 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
840
841 match rx.await {
842 Ok(v) => v.map_err(|_| ()),
843 Err(_) => Err(()),
845 }
846 }
847
848 fn split_multiaddr_and_peer_id(
853 &self,
854 peers: HashSet<Multiaddr>,
855 ) -> Result<Vec<(PeerId, Multiaddr)>, String> {
856 peers
857 .into_iter()
858 .map(|mut addr| {
859 let peer = match addr.pop() {
860 Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
861 _ => return Err("Missing PeerId from address".to_string()),
862 };
863
864 if peer == self.local_peer_id {
867 Err("Local peer ID in peer set.".to_string())
868 } else {
869 Ok((peer, addr))
870 }
871 })
872 .collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
873 }
874}
875
876impl<B, H> NetworkStateInfo for NetworkService<B, H>
877where
878 B: sp_runtime::traits::Block,
879 H: ExHashT,
880{
881 fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
883 self.external_addresses.lock().iter().cloned().map(Into::into).collect()
884 }
885
886 fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
888 self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
889 }
890
891 fn local_peer_id(&self) -> sc_network_types::PeerId {
893 self.local_peer_id.into()
894 }
895}
896
897impl<B, H> NetworkSigner for NetworkService<B, H>
898where
899 B: sp_runtime::traits::Block,
900 H: ExHashT,
901{
902 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
903 let public_key = self.local_identity.public();
904 let bytes = self.local_identity.sign(msg.as_ref())?;
905
906 Ok(Signature {
907 public_key: crate::service::signature::PublicKey::Libp2p(public_key),
908 bytes,
909 })
910 }
911
912 fn verify(
913 &self,
914 peer_id: sc_network_types::PeerId,
915 public_key: &Vec<u8>,
916 signature: &Vec<u8>,
917 message: &Vec<u8>,
918 ) -> Result<bool, String> {
919 let public_key =
920 PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
921 let peer_id: PeerId = peer_id.into();
922 let remote: libp2p::PeerId = public_key.to_peer_id();
923
924 Ok(peer_id == remote && public_key.verify(message, signature))
925 }
926}
927
928impl<B, H> NetworkDHTProvider for NetworkService<B, H>
929where
930 B: BlockT + 'static,
931 H: ExHashT,
932{
933 fn get_value(&self, key: &KademliaKey) {
938 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
939 }
940
941 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
946 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
947 }
948
949 fn put_record_to(
950 &self,
951 record: Record,
952 peers: HashSet<sc_network_types::PeerId>,
953 update_local_storage: bool,
954 ) {
955 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
956 record,
957 peers,
958 update_local_storage,
959 });
960 }
961
962 fn store_record(
963 &self,
964 key: KademliaKey,
965 value: Vec<u8>,
966 publisher: Option<sc_network_types::PeerId>,
967 expires: Option<Instant>,
968 ) {
969 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
970 key,
971 value,
972 publisher.map(Into::into),
973 expires,
974 ));
975 }
976}
977
978#[async_trait::async_trait]
979impl<B, H> NetworkStatusProvider for NetworkService<B, H>
980where
981 B: BlockT + 'static,
982 H: ExHashT,
983{
984 async fn status(&self) -> Result<NetworkStatus, ()> {
985 let (tx, rx) = oneshot::channel();
986
987 let _ = self
988 .to_worker
989 .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
990
991 match rx.await {
992 Ok(v) => v.map_err(|_| ()),
993 Err(_) => Err(()),
995 }
996 }
997
998 async fn network_state(&self) -> Result<NetworkState, ()> {
999 let (tx, rx) = oneshot::channel();
1000
1001 let _ = self
1002 .to_worker
1003 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
1004
1005 match rx.await {
1006 Ok(v) => v.map_err(|_| ()),
1007 Err(_) => Err(()),
1009 }
1010 }
1011}
1012
1013#[async_trait::async_trait]
1014impl<B, H> NetworkPeers for NetworkService<B, H>
1015where
1016 B: BlockT + 'static,
1017 H: ExHashT,
1018{
1019 fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1020 self.sync_protocol_handle
1021 .set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1022 }
1023
1024 fn set_authorized_only(&self, reserved_only: bool) {
1025 self.sync_protocol_handle.set_reserved_only(reserved_only);
1026 }
1027
1028 fn add_known_address(
1029 &self,
1030 peer_id: sc_network_types::PeerId,
1031 addr: sc_network_types::multiaddr::Multiaddr,
1032 ) {
1033 let _ = self
1034 .to_worker
1035 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1036 }
1037
1038 fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1039 self.peer_store_handle.report_peer(peer_id, cost_benefit);
1040 }
1041
1042 fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1043 self.peer_store_handle.peer_reputation(peer_id)
1044 }
1045
1046 fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1047 let _ = self
1048 .to_worker
1049 .unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1050 }
1051
1052 fn accept_unreserved_peers(&self) {
1053 self.sync_protocol_handle.set_reserved_only(false);
1054 }
1055
1056 fn deny_unreserved_peers(&self) {
1057 self.sync_protocol_handle.set_reserved_only(true);
1058 }
1059
1060 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1061 if peer.peer_id == self.local_peer_id.into() {
1063 return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1064 }
1065
1066 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1067 peer.peer_id.into(),
1068 peer.multiaddr.into(),
1069 ));
1070 self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1071
1072 Ok(())
1073 }
1074
1075 fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1076 self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1077 }
1078
1079 fn set_reserved_peers(
1080 &self,
1081 protocol: ProtocolName,
1082 peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1083 ) -> Result<(), String> {
1084 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1085 return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol))
1086 };
1087
1088 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1089 let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1090
1091 let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1092
1093 for (peer_id, addr) in peers_addrs.into_iter() {
1094 if peer_id == self.local_peer_id {
1096 return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1097 }
1098
1099 peers.insert(peer_id.into());
1100
1101 if !addr.is_empty() {
1102 let _ = self
1103 .to_worker
1104 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1105 }
1106 }
1107
1108 self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1109
1110 Ok(())
1111 }
1112
1113 fn add_peers_to_reserved_set(
1114 &self,
1115 protocol: ProtocolName,
1116 peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1117 ) -> Result<(), String> {
1118 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1119 return Err(format!(
1120 "Cannot add peers to reserved set of unknown protocol: {}",
1121 protocol
1122 ))
1123 };
1124
1125 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1126 let peers = self.split_multiaddr_and_peer_id(peers)?;
1127
1128 for (peer_id, addr) in peers.into_iter() {
1129 if peer_id == self.local_peer_id {
1131 return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1132 }
1133
1134 if !addr.is_empty() {
1135 let _ = self
1136 .to_worker
1137 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1138 }
1139
1140 self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1141 }
1142
1143 Ok(())
1144 }
1145
1146 fn remove_peers_from_reserved_set(
1147 &self,
1148 protocol: ProtocolName,
1149 peers: Vec<sc_network_types::PeerId>,
1150 ) -> Result<(), String> {
1151 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1152 return Err(format!(
1153 "Cannot remove peers from reserved set of unknown protocol: {}",
1154 protocol
1155 ))
1156 };
1157
1158 for peer_id in peers.into_iter() {
1159 self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1160 }
1161
1162 Ok(())
1163 }
1164
1165 fn sync_num_connected(&self) -> usize {
1166 self.num_connected.load(Ordering::Relaxed)
1167 }
1168
1169 fn peer_role(
1170 &self,
1171 peer_id: sc_network_types::PeerId,
1172 handshake: Vec<u8>,
1173 ) -> Option<ObservedRole> {
1174 match Roles::decode_all(&mut &handshake[..]) {
1175 Ok(role) => Some(role.into()),
1176 Err(_) => {
1177 log::debug!(target: "sub-libp2p", "handshake doesn't contain peer role: {handshake:?}");
1178 self.peer_store_handle.peer_role(&(peer_id.into()))
1179 },
1180 }
1181 }
1182
1183 async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1187 let (tx, rx) = oneshot::channel();
1188
1189 self.sync_protocol_handle.reserved_peers(tx);
1190
1191 rx.await
1193 .map(|peers| peers.into_iter().map(From::from).collect())
1194 .map_err(|_| ())
1195 }
1196}
1197
1198impl<B, H> NetworkEventStream for NetworkService<B, H>
1199where
1200 B: BlockT + 'static,
1201 H: ExHashT,
1202{
1203 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1204 let (tx, rx) = out_events::channel(name, 100_000);
1205 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1206 Box::pin(rx)
1207 }
1208}
1209
1210#[async_trait::async_trait]
1211impl<B, H> NetworkRequest for NetworkService<B, H>
1212where
1213 B: BlockT + 'static,
1214 H: ExHashT,
1215{
1216 async fn request(
1217 &self,
1218 target: sc_network_types::PeerId,
1219 protocol: ProtocolName,
1220 request: Vec<u8>,
1221 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1222 connect: IfDisconnected,
1223 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1224 let (tx, rx) = oneshot::channel();
1225
1226 self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1227
1228 match rx.await {
1229 Ok(v) => v,
1230 Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1234 }
1235 }
1236
1237 fn start_request(
1238 &self,
1239 target: sc_network_types::PeerId,
1240 protocol: ProtocolName,
1241 request: Vec<u8>,
1242 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1243 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1244 connect: IfDisconnected,
1245 ) {
1246 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1247 target: target.into(),
1248 protocol: protocol.into(),
1249 request,
1250 fallback_request,
1251 pending_response: tx,
1252 connect,
1253 });
1254 }
1255}
1256
1257#[must_use]
1259pub struct NotificationSender {
1260 sink: NotificationsSink,
1261
1262 protocol_name: ProtocolName,
1264
1265 notification_size_metric: Option<Histogram>,
1268}
1269
1270#[async_trait::async_trait]
1271impl NotificationSenderT for NotificationSender {
1272 async fn ready(
1273 &self,
1274 ) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1275 Ok(Box::new(NotificationSenderReady {
1276 ready: match self.sink.reserve_notification().await {
1277 Ok(r) => Some(r),
1278 Err(()) => return Err(NotificationSenderError::Closed),
1279 },
1280 peer_id: self.sink.peer_id(),
1281 protocol_name: &self.protocol_name,
1282 notification_size_metric: self.notification_size_metric.clone(),
1283 }))
1284 }
1285}
1286
1287#[must_use]
1289pub struct NotificationSenderReady<'a> {
1290 ready: Option<Ready<'a>>,
1291
1292 peer_id: &'a PeerId,
1294
1295 protocol_name: &'a ProtocolName,
1297
1298 notification_size_metric: Option<Histogram>,
1301}
1302
1303impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1304 fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1305 if let Some(notification_size_metric) = &self.notification_size_metric {
1306 notification_size_metric.observe(notification.len() as f64);
1307 }
1308
1309 trace!(
1310 target: "sub-libp2p",
1311 "External API => Notification({:?}, {}, {} bytes)",
1312 self.peer_id, self.protocol_name, notification.len(),
1313 );
1314 trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id);
1315
1316 self.ready
1317 .take()
1318 .ok_or(NotificationSenderError::Closed)?
1319 .send(notification)
1320 .map_err(|()| NotificationSenderError::Closed)
1321 }
1322}
1323
1324enum ServiceToWorkerMsg {
1328 GetValue(KademliaKey),
1329 PutValue(KademliaKey, Vec<u8>),
1330 PutRecordTo {
1331 record: Record,
1332 peers: HashSet<sc_network_types::PeerId>,
1333 update_local_storage: bool,
1334 },
1335 StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1336 AddKnownAddress(PeerId, Multiaddr),
1337 EventStream(out_events::Sender),
1338 Request {
1339 target: PeerId,
1340 protocol: ProtocolName,
1341 request: Vec<u8>,
1342 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1343 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1344 connect: IfDisconnected,
1345 },
1346 NetworkStatus {
1347 pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1348 },
1349 NetworkState {
1350 pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1351 },
1352 DisconnectPeer(PeerId, ProtocolName),
1353}
1354
1355#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1359pub struct NetworkWorker<B, H>
1360where
1361 B: BlockT + 'static,
1362 H: ExHashT,
1363{
1364 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1366 num_connected: Arc<AtomicUsize>,
1368 service: Arc<NetworkService<B, H>>,
1370 network_service: Swarm<Behaviour<B>>,
1372 from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1374 event_streams: out_events::OutChannels,
1376 metrics: Option<Metrics>,
1378 boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1380 reported_invalid_boot_nodes: HashSet<PeerId>,
1382 peer_store_handle: Arc<dyn PeerStoreProvider>,
1384 notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1386 _marker: PhantomData<H>,
1389 _block: PhantomData<B>,
1391}
1392
1393impl<B, H> NetworkWorker<B, H>
1394where
1395 B: BlockT + 'static,
1396 H: ExHashT,
1397{
1398 pub async fn run(mut self) {
1400 while self.next_action().await {}
1401 }
1402
1403 pub async fn next_action(&mut self) -> bool {
1408 futures::select! {
1409 msg = self.from_service.next() => {
1411 if let Some(msg) = msg {
1412 self.handle_worker_message(msg);
1413 } else {
1414 return false
1415 }
1416 },
1417 event = self.network_service.select_next_some() => {
1419 self.handle_swarm_event(event);
1420 },
1421 };
1422
1423 let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1425 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1426
1427 if let Some(metrics) = self.metrics.as_ref() {
1428 if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1429 for (lower_ilog2_bucket_bound, num_entries) in buckets {
1430 metrics
1431 .kbuckets_num_nodes
1432 .with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1433 .set(num_entries as u64);
1434 }
1435 }
1436 if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1437 metrics.kademlia_records_count.set(num_entries as u64);
1438 }
1439 if let Some(num_entries) =
1440 self.network_service.behaviour_mut().kademlia_records_total_size()
1441 {
1442 metrics.kademlia_records_sizes_total.set(num_entries as u64);
1443 }
1444
1445 metrics.pending_connections.set(
1446 Swarm::network_info(&self.network_service).connection_counters().num_pending()
1447 as u64,
1448 );
1449 }
1450
1451 true
1452 }
1453
1454 fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1456 match msg {
1457 ServiceToWorkerMsg::GetValue(key) =>
1458 self.network_service.behaviour_mut().get_value(key),
1459 ServiceToWorkerMsg::PutValue(key, value) =>
1460 self.network_service.behaviour_mut().put_value(key, value),
1461 ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1462 .network_service
1463 .behaviour_mut()
1464 .put_record_to(record, peers, update_local_storage),
1465 ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1466 .network_service
1467 .behaviour_mut()
1468 .store_record(key, value, publisher, expires),
1469 ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
1470 self.network_service.behaviour_mut().add_known_address(peer_id, addr),
1471 ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1472 ServiceToWorkerMsg::Request {
1473 target,
1474 protocol,
1475 request,
1476 fallback_request,
1477 pending_response,
1478 connect,
1479 } => {
1480 self.network_service.behaviour_mut().send_request(
1481 &target,
1482 protocol,
1483 request,
1484 fallback_request,
1485 pending_response,
1486 connect,
1487 );
1488 },
1489 ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1490 let _ = pending_response.send(Ok(self.status()));
1491 },
1492 ServiceToWorkerMsg::NetworkState { pending_response } => {
1493 let _ = pending_response.send(Ok(self.network_state()));
1494 },
1495 ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1496 .network_service
1497 .behaviour_mut()
1498 .user_protocol_mut()
1499 .disconnect_peer(&who, protocol_name),
1500 }
1501 }
1502
1503 #[allow(deprecated)]
1505 fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut, THandlerErr<Behaviour<B>>>) {
1506 match event {
1507 SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1508 if let Some(metrics) = self.metrics.as_ref() {
1509 match result {
1510 Ok(serve_time) => {
1511 metrics
1512 .requests_in_success_total
1513 .with_label_values(&[&protocol])
1514 .observe(serve_time.as_secs_f64());
1515 },
1516 Err(err) => {
1517 let reason = match err {
1518 ResponseFailure::Network(InboundFailure::Timeout) =>
1519 Some("timeout"),
1520 ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1521 None,
1526 ResponseFailure::Network(InboundFailure::ResponseOmission) =>
1527 Some("busy-omitted"),
1528 ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
1529 Some("connection-closed"),
1530 };
1531
1532 if let Some(reason) = reason {
1533 metrics
1534 .requests_in_failure_total
1535 .with_label_values(&[&protocol, reason])
1536 .inc();
1537 }
1538 },
1539 }
1540 }
1541 },
1542 SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1543 protocol,
1544 duration,
1545 result,
1546 ..
1547 }) =>
1548 if let Some(metrics) = self.metrics.as_ref() {
1549 match result {
1550 Ok(_) => {
1551 metrics
1552 .requests_out_success_total
1553 .with_label_values(&[&protocol])
1554 .observe(duration.as_secs_f64());
1555 },
1556 Err(err) => {
1557 let reason = match err {
1558 RequestFailure::NotConnected => "not-connected",
1559 RequestFailure::UnknownProtocol => "unknown-protocol",
1560 RequestFailure::Refused => "refused",
1561 RequestFailure::Obsolete => "obsolete",
1562 RequestFailure::Network(OutboundFailure::DialFailure) =>
1563 "dial-failure",
1564 RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1565 RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
1566 "connection-closed",
1567 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
1568 "unsupported",
1569 };
1570
1571 metrics
1572 .requests_out_failure_total
1573 .with_label_values(&[&protocol, reason])
1574 .inc();
1575 },
1576 }
1577 },
1578 SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1579 for change in changes {
1580 self.peer_store_handle.report_peer(peer.into(), change);
1581 }
1582 },
1583 SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1584 peer_id,
1585 info:
1586 IdentifyInfo {
1587 protocol_version, agent_version, mut listen_addrs, protocols, ..
1588 },
1589 }) => {
1590 if listen_addrs.len() > 30 {
1591 debug!(
1592 target: "sub-libp2p",
1593 "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1594 peer_id, protocol_version, agent_version
1595 );
1596 listen_addrs.truncate(30);
1597 }
1598 for addr in listen_addrs {
1599 self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1600 &peer_id,
1601 &protocols,
1602 addr.clone(),
1603 );
1604 }
1605 self.peer_store_handle.add_known_peer(peer_id.into());
1606 },
1607 SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1608 self.peer_store_handle.add_known_peer(peer_id.into());
1609 },
1610 SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1611 if let Some(metrics) = self.metrics.as_ref() {
1612 metrics.kademlia_random_queries_total.inc();
1613 }
1614 },
1615 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1616 remote,
1617 set_id,
1618 direction,
1619 negotiated_fallback,
1620 notifications_sink,
1621 received_handshake,
1622 }) => {
1623 let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1624 remote,
1625 direction,
1626 received_handshake,
1627 negotiated_fallback,
1628 notifications_sink,
1629 );
1630 },
1631 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1632 remote,
1633 set_id,
1634 notifications_sink,
1635 }) => {
1636 let _ = self.notif_protocol_handles[usize::from(set_id)]
1637 .report_notification_sink_replaced(remote, notifications_sink);
1638
1639 },
1660 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1661 let _ = self.notif_protocol_handles[usize::from(set_id)]
1662 .report_substream_closed(remote);
1663 },
1664 SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1665 remote,
1666 set_id,
1667 notification,
1668 }) => {
1669 let _ = self.notif_protocol_handles[usize::from(set_id)]
1670 .report_notification_received(remote, notification);
1671 },
1672 SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1673 match (self.metrics.as_ref(), duration) {
1674 (Some(metrics), Some(duration)) => {
1675 let query_type = match event {
1676 DhtEvent::ValueFound(_) => "value-found",
1677 DhtEvent::ValueNotFound(_) => "value-not-found",
1678 DhtEvent::ValuePut(_) => "value-put",
1679 DhtEvent::ValuePutFailed(_) => "value-put-failed",
1680 DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1681 };
1682 metrics
1683 .kademlia_query_duration
1684 .with_label_values(&[query_type])
1685 .observe(duration.as_secs_f64());
1686 },
1687 _ => {},
1688 }
1689
1690 self.event_streams.send(Event::Dht(event));
1691 },
1692 SwarmEvent::Behaviour(BehaviourOut::None) => {
1693 },
1695 SwarmEvent::ConnectionEstablished {
1696 peer_id,
1697 endpoint,
1698 num_established,
1699 concurrent_dial_errors,
1700 ..
1701 } => {
1702 if let Some(errors) = concurrent_dial_errors {
1703 debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1704 } else {
1705 debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
1706 }
1707
1708 if let Some(metrics) = self.metrics.as_ref() {
1709 let direction = match endpoint {
1710 ConnectedPoint::Dialer { .. } => "out",
1711 ConnectedPoint::Listener { .. } => "in",
1712 };
1713 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1714
1715 if num_established.get() == 1 {
1716 metrics.distinct_peers_connections_opened_total.inc();
1717 }
1718 }
1719 },
1720 SwarmEvent::ConnectionClosed {
1721 connection_id,
1722 peer_id,
1723 cause,
1724 endpoint,
1725 num_established,
1726 } => {
1727 debug!(target: "sub-libp2p", "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1728 if let Some(metrics) = self.metrics.as_ref() {
1729 let direction = match endpoint {
1730 ConnectedPoint::Dialer { .. } => "out",
1731 ConnectedPoint::Listener { .. } => "in",
1732 };
1733 let reason = match cause {
1734 Some(ConnectionError::IO(_)) => "transport-error",
1735 Some(ConnectionError::Handler(Either::Left(Either::Left(
1736 Either::Left(Either::Right(
1737 NotifsHandlerError::SyncNotificationsClogged,
1738 )),
1739 )))) => "sync-notifications-clogged",
1740 Some(ConnectionError::Handler(Either::Left(Either::Left(
1741 Either::Right(Either::Left(_)),
1742 )))) => "ping-timeout",
1743 Some(ConnectionError::Handler(_)) => "protocol-error",
1744 Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1745 None => "actively-closed",
1746 };
1747 metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1748
1749 if num_established == 0 {
1751 metrics.distinct_peers_connections_closed_total.inc();
1752 }
1753 }
1754 },
1755 SwarmEvent::NewListenAddr { address, .. } => {
1756 trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address);
1757 if let Some(metrics) = self.metrics.as_ref() {
1758 metrics.listeners_local_addresses.inc();
1759 }
1760 self.listen_addresses.lock().insert(address.clone());
1761 },
1762 SwarmEvent::ExpiredListenAddr { address, .. } => {
1763 info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
1764 if let Some(metrics) = self.metrics.as_ref() {
1765 metrics.listeners_local_addresses.dec();
1766 }
1767 self.listen_addresses.lock().remove(&address);
1768 },
1769 SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1770 if let Some(peer_id) = peer_id {
1771 trace!(
1772 target: "sub-libp2p",
1773 "Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1774 );
1775
1776 let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1777
1778 if let Some(addresses) =
1779 not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1780 {
1781 if let DialError::WrongPeerId { obtained, endpoint } = &error {
1782 if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint {
1783 let address_without_peer_id = parse_addr(address.clone().into())
1784 .map_or_else(|_| address.clone(), |r| r.1.into());
1785
1786 if addresses.iter().any(|a| address_without_peer_id == *a) {
1790 warn!(
1791 "💔 The bootnode you want to connect to at `{address}` provided a \
1792 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1793 );
1794
1795 self.reported_invalid_boot_nodes.insert(peer_id);
1796 }
1797 }
1798 }
1799 }
1800 }
1801
1802 if let Some(metrics) = self.metrics.as_ref() {
1803 #[allow(deprecated)]
1804 let reason = match error {
1805 DialError::Denied { cause } =>
1806 if cause.downcast::<Exceeded>().is_ok() {
1807 Some("limit-reached")
1808 } else {
1809 None
1810 },
1811 DialError::LocalPeerId { .. } => Some("local-peer-id"),
1812 DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1813 DialError::Transport(_) => Some("transport-error"),
1814 DialError::NoAddresses |
1815 DialError::DialPeerConditionFalse(_) |
1816 DialError::Aborted => None, };
1818 if let Some(reason) = reason {
1819 metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1820 }
1821 }
1822 },
1823 SwarmEvent::Dialing { connection_id, peer_id } => {
1824 trace!(target: "sub-libp2p", "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1825 },
1826 SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1827 trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1828 if let Some(metrics) = self.metrics.as_ref() {
1829 metrics.incoming_connections_total.inc();
1830 }
1831 },
1832 SwarmEvent::IncomingConnectionError {
1833 connection_id,
1834 local_addr,
1835 send_back_addr,
1836 error,
1837 } => {
1838 debug!(
1839 target: "sub-libp2p",
1840 "Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1841 );
1842 if let Some(metrics) = self.metrics.as_ref() {
1843 #[allow(deprecated)]
1844 let reason = match error {
1845 ListenError::Denied { cause } =>
1846 if cause.downcast::<Exceeded>().is_ok() {
1847 Some("limit-reached")
1848 } else {
1849 None
1850 },
1851 ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } =>
1852 Some("invalid-peer-id"),
1853 ListenError::Transport(_) => Some("transport-error"),
1854 ListenError::Aborted => None, };
1856
1857 if let Some(reason) = reason {
1858 metrics
1859 .incoming_connections_errors_total
1860 .with_label_values(&[reason])
1861 .inc();
1862 }
1863 }
1864 },
1865 SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1866 if let Some(metrics) = self.metrics.as_ref() {
1867 metrics.listeners_local_addresses.sub(addresses.len() as u64);
1868 }
1869 let mut listen_addresses = self.listen_addresses.lock();
1870 for addr in &addresses {
1871 listen_addresses.remove(addr);
1872 }
1873 drop(listen_addresses);
1874
1875 let addrs =
1876 addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1877 match reason {
1878 Ok(()) => error!(
1879 target: "sub-libp2p",
1880 "📪 Libp2p listener ({}) closed gracefully",
1881 addrs
1882 ),
1883 Err(e) => error!(
1884 target: "sub-libp2p",
1885 "📪 Libp2p listener ({}) closed: {}",
1886 addrs, e
1887 ),
1888 }
1889 },
1890 SwarmEvent::ListenerError { error, .. } => {
1891 debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
1892 if let Some(metrics) = self.metrics.as_ref() {
1893 metrics.listeners_errors_total.inc();
1894 }
1895 },
1896 }
1897 }
1898}
1899
1900impl<B, H> Unpin for NetworkWorker<B, H>
1901where
1902 B: BlockT + 'static,
1903 H: ExHashT,
1904{
1905}
1906
1907pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1908 addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1909 transport: &TransportConfig,
1910) -> Result<(), Error> {
1911 use sc_network_types::multiaddr::Protocol;
1912
1913 if matches!(transport, TransportConfig::MemoryOnly) {
1914 let addresses: Vec<_> = addresses
1915 .filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1916 .cloned()
1917 .collect();
1918
1919 if !addresses.is_empty() {
1920 return Err(Error::AddressesForAnotherTransport {
1921 transport: transport.clone(),
1922 addresses,
1923 })
1924 }
1925 } else {
1926 let addresses: Vec<_> = addresses
1927 .filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1928 .cloned()
1929 .collect();
1930
1931 if !addresses.is_empty() {
1932 return Err(Error::AddressesForAnotherTransport {
1933 transport: transport.clone(),
1934 addresses,
1935 })
1936 }
1937 }
1938
1939 Ok(())
1940}