sc_network/
service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Main entry point of the sc-network crate.
20//!
21//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
22//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
23//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
24//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
25//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
26//!
27//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
28//! which is then processed by [`NetworkWorker::next_action`].
29
30use crate::{
31	behaviour::{self, Behaviour, BehaviourOut},
32	bitswap::BitswapRequestHandler,
33	config::{
34		parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35		NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36	},
37	discovery::DiscoveryConfig,
38	error::Error,
39	event::{DhtEvent, Event},
40	network_state::{
41		NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42	},
43	peer_store::{PeerStore, PeerStoreProvider},
44	protocol::{self, NotifsHandlerError, Protocol, Ready},
45	protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46	request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47	service::{
48		signature::{Signature, SigningError},
49		traits::{
50			BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51			NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52			NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53			NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54		},
55	},
56	transport,
57	types::ProtocolName,
58	NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use either::Either;
63use futures::{channel::oneshot, prelude::*};
64#[allow(deprecated)]
65use libp2p::swarm::THandlerErr;
66use libp2p::{
67	connection_limits::{ConnectionLimits, Exceeded},
68	core::{upgrade, ConnectedPoint, Endpoint},
69	identify::Info as IdentifyInfo,
70	identity::ed25519,
71	kad::{record::Key as KademliaKey, Record},
72	multiaddr::{self, Multiaddr},
73	swarm::{
74		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
75		NetworkBehaviour, Swarm, SwarmEvent,
76	},
77	PeerId,
78};
79use log::{debug, error, info, trace, warn};
80use metrics::{Histogram, MetricSources, Metrics};
81use parking_lot::Mutex;
82use prometheus_endpoint::Registry;
83
84use sc_client_api::BlockBackend;
85use sc_network_common::{
86	role::{ObservedRole, Roles},
87	ExHashT,
88};
89use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
90use sp_runtime::traits::Block as BlockT;
91
92pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
93pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
94pub use metrics::NotificationMetrics;
95pub use protocol::NotificationsSink;
96use std::{
97	cmp,
98	collections::{HashMap, HashSet},
99	fs, iter,
100	marker::PhantomData,
101	num::NonZeroUsize,
102	pin::Pin,
103	str,
104	sync::{
105		atomic::{AtomicUsize, Ordering},
106		Arc,
107	},
108	time::{Duration, Instant},
109};
110
111pub(crate) mod metrics;
112pub(crate) mod out_events;
113
114pub mod signature;
115pub mod traits;
116
117struct Libp2pBandwidthSink {
118	sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122	fn total_inbound(&self) -> u64 {
123		self.sink.total_inbound()
124	}
125
126	fn total_outbound(&self) -> u64 {
127		self.sink.total_outbound()
128	}
129}
130
131/// Substrate network service. Handles network IO and manages connectivity.
132pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133	/// Number of peers we're connected to.
134	num_connected: Arc<AtomicUsize>,
135	/// The local external addresses.
136	external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
138	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139	/// Local copy of the `PeerId` of the local node.
140	local_peer_id: PeerId,
141	/// The `KeyPair` that defines the `PeerId` of the local node.
142	local_identity: Keypair,
143	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
144	bandwidth: Arc<dyn BandwidthSink>,
145	/// Channel that sends messages to the actual worker.
146	to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147	/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
148	/// initialization.
149	notification_protocol_ids: HashMap<ProtocolName, SetId>,
150	/// Handles to manage peer connections on notification protocols. The vector never changes
151	/// after initialization.
152	protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153	/// Shortcut to sync protocol handle (`protocol_handles[0]`).
154	sync_protocol_handle: protocol_controller::ProtocolHandle,
155	/// Handle to `PeerStore`.
156	peer_store_handle: Arc<dyn PeerStoreProvider>,
157	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
158	/// compatibility.
159	_marker: PhantomData<H>,
160	/// Marker for block type
161	_block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167	B: BlockT + 'static,
168	H: ExHashT,
169{
170	type NotificationProtocolConfig = NonDefaultSetConfig;
171	type RequestResponseProtocolConfig = RequestResponseConfig;
172	type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173	type PeerStore = PeerStore;
174	type BitswapConfig = RequestResponseConfig;
175
176	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177	where
178		Self: Sized,
179	{
180		NetworkWorker::new(params)
181	}
182
183	/// Get handle to `NetworkService` of the `NetworkBackend`.
184	fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185		self.service.clone()
186	}
187
188	/// Create `PeerStore`.
189	fn peer_store(
190		bootnodes: Vec<sc_network_types::PeerId>,
191		metrics_registry: Option<Registry>,
192	) -> Self::PeerStore {
193		PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194	}
195
196	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197		NotificationMetrics::new(registry)
198	}
199
200	fn bitswap_server(
201		client: Arc<dyn BlockBackend<B> + Send + Sync>,
202	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
203		let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
204
205		(Box::pin(async move { handler.run().await }), protocol_config)
206	}
207
208	/// Create notification protocol configuration.
209	fn notification_config(
210		protocol_name: ProtocolName,
211		fallback_names: Vec<ProtocolName>,
212		max_notification_size: u64,
213		handshake: Option<NotificationHandshake>,
214		set_config: SetConfig,
215		_metrics: NotificationMetrics,
216		_peerstore_handle: Arc<dyn PeerStoreProvider>,
217	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
218		NonDefaultSetConfig::new(
219			protocol_name,
220			fallback_names,
221			max_notification_size,
222			handshake,
223			set_config,
224		)
225	}
226
227	/// Create request-response protocol configuration.
228	fn request_response_config(
229		protocol_name: ProtocolName,
230		fallback_names: Vec<ProtocolName>,
231		max_request_size: u64,
232		max_response_size: u64,
233		request_timeout: Duration,
234		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
235	) -> Self::RequestResponseProtocolConfig {
236		Self::RequestResponseProtocolConfig {
237			name: protocol_name,
238			fallback_names,
239			max_request_size,
240			max_response_size,
241			request_timeout,
242			inbound_queue,
243		}
244	}
245
246	/// Start [`NetworkBackend`] event loop.
247	async fn run(mut self) {
248		self.run().await
249	}
250}
251
252impl<B, H> NetworkWorker<B, H>
253where
254	B: BlockT + 'static,
255	H: ExHashT,
256{
257	/// Creates the network service.
258	///
259	/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
260	/// for the network processing to advance. From it, you can extract a `NetworkService` using
261	/// `worker.service()`. The `NetworkService` can be shared through the codebase.
262	pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
263		let peer_store_handle = params.network_config.peer_store_handle();
264		let FullNetworkConfiguration {
265			notification_protocols,
266			request_response_protocols,
267			mut network_config,
268			..
269		} = params.network_config;
270
271		// Private and public keys configuration.
272		let local_identity = network_config.node_key.clone().into_keypair()?;
273		let local_public = local_identity.public();
274		let local_peer_id = local_public.to_peer_id();
275
276		// Convert to libp2p types.
277		let local_identity: ed25519::Keypair = local_identity.into();
278		let local_public: ed25519::PublicKey = local_public.into();
279		let local_peer_id: PeerId = local_peer_id.into();
280
281		network_config.boot_nodes = network_config
282			.boot_nodes
283			.into_iter()
284			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
285			.collect();
286		network_config.default_peers_set.reserved_nodes = network_config
287			.default_peers_set
288			.reserved_nodes
289			.into_iter()
290			.filter(|reserved_node| {
291				if reserved_node.peer_id == local_peer_id.into() {
292					warn!(
293						target: "sub-libp2p",
294						"Local peer ID used in reserved node, ignoring: {}",
295						reserved_node,
296					);
297					false
298				} else {
299					true
300				}
301			})
302			.collect();
303
304		// Ensure the listen addresses are consistent with the transport.
305		ensure_addresses_consistent_with_transport(
306			network_config.listen_addresses.iter(),
307			&network_config.transport,
308		)?;
309		ensure_addresses_consistent_with_transport(
310			network_config.boot_nodes.iter().map(|x| &x.multiaddr),
311			&network_config.transport,
312		)?;
313		ensure_addresses_consistent_with_transport(
314			network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
315			&network_config.transport,
316		)?;
317		for notification_protocol in &notification_protocols {
318			ensure_addresses_consistent_with_transport(
319				notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
320				&network_config.transport,
321			)?;
322		}
323		ensure_addresses_consistent_with_transport(
324			network_config.public_addresses.iter(),
325			&network_config.transport,
326		)?;
327
328		let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
329
330		if let Some(path) = &network_config.net_config_path {
331			fs::create_dir_all(path)?;
332		}
333
334		info!(
335			target: "sub-libp2p",
336			"🏷  Local node identity is: {}",
337			local_peer_id.to_base58(),
338		);
339		log::info!(target: "sub-libp2p", "Running libp2p network backend");
340
341		let (transport, bandwidth) = {
342			let config_mem = match network_config.transport {
343				TransportConfig::MemoryOnly => true,
344				TransportConfig::Normal { .. } => false,
345			};
346
347			// The yamux buffer size limit is configured to be equal to the maximum frame size
348			// of all protocols. 10 bytes are added to each limit for the length prefix that
349			// is not included in the upper layer protocols limit but is still present in the
350			// yamux buffer. These 10 bytes correspond to the maximum size required to encode
351			// a variable-length-encoding 64bits number. In other words, we make the
352			// assumption that no notification larger than 2^64 will ever be sent.
353			let yamux_maximum_buffer_size = {
354				let requests_max = request_response_protocols
355					.iter()
356					.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::MAX));
357				let responses_max = request_response_protocols
358					.iter()
359					.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
360				let notifs_max = notification_protocols
361					.iter()
362					.map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX));
363
364				// A "default" max is added to cover all the other protocols: ping, identify,
365				// kademlia, block announces, and transactions.
366				let default_max = cmp::max(
367					1024 * 1024,
368					usize::try_from(protocol::BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE)
369						.unwrap_or(usize::MAX),
370				);
371
372				iter::once(default_max)
373					.chain(requests_max)
374					.chain(responses_max)
375					.chain(notifs_max)
376					.max()
377					.expect("iterator known to always yield at least one element; qed")
378					.saturating_add(10)
379			};
380
381			transport::build_transport(
382				local_identity.clone().into(),
383				config_mem,
384				network_config.yamux_window_size,
385				yamux_maximum_buffer_size,
386			)
387		};
388
389		let (to_notifications, from_protocol_controllers) =
390			tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
391
392		// We must prepend a hardcoded default peer set to notification protocols.
393		let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
394			.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
395
396		let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
397			.enumerate()
398			.map(|(set_id, set_config)| {
399				let proto_set_config = ProtoSetConfig {
400					in_peers: set_config.in_peers,
401					out_peers: set_config.out_peers,
402					reserved_nodes: set_config
403						.reserved_nodes
404						.iter()
405						.map(|node| node.peer_id.into())
406						.collect(),
407					reserved_only: set_config.non_reserved_mode.is_reserved_only(),
408				};
409
410				ProtocolController::new(
411					SetId::from(set_id),
412					proto_set_config,
413					to_notifications.clone(),
414					Arc::clone(&peer_store_handle),
415				)
416			})
417			.unzip();
418
419		// Shortcut to default (sync) peer set protocol handle.
420		let sync_protocol_handle = protocol_handles[0].clone();
421
422		// Spawn `ProtocolController` runners.
423		protocol_controllers
424			.into_iter()
425			.for_each(|controller| (params.executor)(controller.run().boxed()));
426
427		// Protocol name to protocol id mapping. The first protocol is always block announce (sync)
428		// protocol, aka default (hardcoded) peer set.
429		let notification_protocol_ids: HashMap<ProtocolName, SetId> =
430			iter::once(&params.block_announce_config)
431				.chain(notification_protocols.iter())
432				.enumerate()
433				.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
434				.collect();
435
436		let known_addresses = {
437			// Collect all reserved nodes and bootnodes addresses.
438			let mut addresses: Vec<_> = network_config
439				.default_peers_set
440				.reserved_nodes
441				.iter()
442				.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
443				.chain(notification_protocols.iter().flat_map(|protocol| {
444					protocol
445						.set_config()
446						.reserved_nodes
447						.iter()
448						.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
449				}))
450				.chain(
451					network_config
452						.boot_nodes
453						.iter()
454						.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
455				)
456				.collect();
457
458			// Remove possible duplicates.
459			addresses.sort();
460			addresses.dedup();
461
462			addresses
463		};
464
465		// Check for duplicate bootnodes.
466		network_config.boot_nodes.iter().try_for_each(|bootnode| {
467			if let Some(other) = network_config
468				.boot_nodes
469				.iter()
470				.filter(|o| o.multiaddr == bootnode.multiaddr)
471				.find(|o| o.peer_id != bootnode.peer_id)
472			{
473				Err(Error::DuplicateBootnode {
474					address: bootnode.multiaddr.clone().into(),
475					first_id: bootnode.peer_id.into(),
476					second_id: other.peer_id.into(),
477				})
478			} else {
479				Ok(())
480			}
481		})?;
482
483		// List of bootnode multiaddresses.
484		let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
485
486		for bootnode in network_config.boot_nodes.iter() {
487			boot_node_ids
488				.entry(bootnode.peer_id.into())
489				.or_default()
490				.push(bootnode.multiaddr.clone().into());
491		}
492
493		let boot_node_ids = Arc::new(boot_node_ids);
494
495		let num_connected = Arc::new(AtomicUsize::new(0));
496		let external_addresses = Arc::new(Mutex::new(HashSet::new()));
497
498		let (protocol, notif_protocol_handles) = Protocol::new(
499			From::from(&params.role),
500			params.notification_metrics,
501			notification_protocols,
502			params.block_announce_config,
503			Arc::clone(&peer_store_handle),
504			protocol_handles.clone(),
505			from_protocol_controllers,
506		)?;
507
508		// Build the swarm.
509		let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
510			let user_agent =
511				format!("{} ({})", network_config.client_version, network_config.node_name);
512
513			let discovery_config = {
514				let mut config = DiscoveryConfig::new(local_peer_id);
515				config.with_permanent_addresses(
516					known_addresses
517						.iter()
518						.map(|(peer, address)| (peer.into(), address.clone().into()))
519						.collect::<Vec<_>>(),
520				);
521				config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
522				config.with_kademlia(
523					params.genesis_hash,
524					params.fork_id.as_deref(),
525					&params.protocol_id,
526				);
527				config.with_dht_random_walk(network_config.enable_dht_random_walk);
528				config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
529				config.use_kademlia_disjoint_query_paths(
530					network_config.kademlia_disjoint_query_paths,
531				);
532				config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
533
534				match network_config.transport {
535					TransportConfig::MemoryOnly => {
536						config.with_mdns(false);
537						config.allow_private_ip(false);
538					},
539					TransportConfig::Normal {
540						enable_mdns,
541						allow_private_ip: allow_private_ipv4,
542						..
543					} => {
544						config.with_mdns(enable_mdns);
545						config.allow_private_ip(allow_private_ipv4);
546					},
547				}
548
549				config
550			};
551
552			let behaviour = {
553				let result = Behaviour::new(
554					protocol,
555					user_agent,
556					local_public.into(),
557					discovery_config,
558					request_response_protocols,
559					Arc::clone(&peer_store_handle),
560					external_addresses.clone(),
561					ConnectionLimits::default()
562						.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
563						.with_max_established_incoming(Some(
564							crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
565						)),
566				);
567
568				match result {
569					Ok(b) => b,
570					Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) =>
571						return Err(Error::DuplicateRequestResponseProtocol { protocol: proto }),
572				}
573			};
574
575			let swarm = {
576				struct SpawnImpl<F>(F);
577				impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
578					fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
579						(self.0)(f)
580					}
581				}
582
583				let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
584					.with_substream_upgrade_protocol_override(upgrade::Version::V1)
585					.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
586					// NOTE: 24 is somewhat arbitrary and should be tuned in the future if
587					// necessary. See <https://github.com/paritytech/substrate/pull/6080>
588					.with_per_connection_event_buffer_size(24)
589					.with_max_negotiating_inbound_streams(2048)
590					.with_idle_connection_timeout(Duration::from_secs(10));
591
592				Swarm::new(transport, behaviour, local_peer_id, config)
593			};
594
595			(swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
596		};
597
598		// Initialize the metrics.
599		let metrics = match &params.metrics_registry {
600			Some(registry) => Some(metrics::register(
601				registry,
602				MetricSources {
603					bandwidth: bandwidth.clone(),
604					connected_peers: num_connected.clone(),
605				},
606			)?),
607			None => None,
608		};
609
610		// Listen on multiaddresses.
611		for addr in &network_config.listen_addresses {
612			if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
613				warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
614			}
615		}
616
617		// Add external addresses.
618		for addr in &network_config.public_addresses {
619			Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
620		}
621
622		let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
623
624		let service = Arc::new(NetworkService {
625			bandwidth,
626			external_addresses,
627			listen_addresses: listen_addresses_set.clone(),
628			num_connected: num_connected.clone(),
629			local_peer_id,
630			local_identity: local_identity.into(),
631			to_worker,
632			notification_protocol_ids,
633			protocol_handles,
634			sync_protocol_handle,
635			peer_store_handle: Arc::clone(&peer_store_handle),
636			_marker: PhantomData,
637			_block: Default::default(),
638		});
639
640		Ok(NetworkWorker {
641			listen_addresses: listen_addresses_set,
642			num_connected,
643			network_service: swarm,
644			service,
645			from_service,
646			event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
647			metrics,
648			boot_node_ids,
649			reported_invalid_boot_nodes: Default::default(),
650			peer_store_handle: Arc::clone(&peer_store_handle),
651			notif_protocol_handles,
652			_marker: Default::default(),
653			_block: Default::default(),
654		})
655	}
656
657	/// High-level network status information.
658	pub fn status(&self) -> NetworkStatus {
659		NetworkStatus {
660			num_connected_peers: self.num_connected_peers(),
661			total_bytes_inbound: self.total_bytes_inbound(),
662			total_bytes_outbound: self.total_bytes_outbound(),
663		}
664	}
665
666	/// Returns the total number of bytes received so far.
667	pub fn total_bytes_inbound(&self) -> u64 {
668		self.service.bandwidth.total_inbound()
669	}
670
671	/// Returns the total number of bytes sent so far.
672	pub fn total_bytes_outbound(&self) -> u64 {
673		self.service.bandwidth.total_outbound()
674	}
675
676	/// Returns the number of peers we're connected to.
677	pub fn num_connected_peers(&self) -> usize {
678		self.network_service.behaviour().user_protocol().num_sync_peers()
679	}
680
681	/// Adds an address for a node.
682	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
683		self.network_service.behaviour_mut().add_known_address(peer_id, addr);
684	}
685
686	/// Return a `NetworkService` that can be shared through the code base and can be used to
687	/// manipulate the worker.
688	pub fn service(&self) -> &Arc<NetworkService<B, H>> {
689		&self.service
690	}
691
692	/// Returns the local `PeerId`.
693	pub fn local_peer_id(&self) -> &PeerId {
694		Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
695	}
696
697	/// Returns the list of addresses we are listening on.
698	///
699	/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
700	pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
701		Swarm::<Behaviour<B>>::listeners(&self.network_service)
702	}
703
704	/// Get network state.
705	///
706	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
707	/// everywhere about this. Please don't use this function to retrieve actual information.
708	pub fn network_state(&mut self) -> NetworkState {
709		let swarm = &mut self.network_service;
710		let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
711		let connected_peers = {
712			let swarm = &mut *swarm;
713			open.iter()
714				.filter_map(move |peer_id| {
715					let known_addresses = if let Ok(addrs) =
716						NetworkBehaviour::handle_pending_outbound_connection(
717							swarm.behaviour_mut(),
718							ConnectionId::new_unchecked(0), // dummy value
719							Some(*peer_id),
720							&vec![],
721							Endpoint::Listener,
722						) {
723						addrs.into_iter().collect()
724					} else {
725						error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id);
726						return None
727					};
728
729					let endpoint = if let Some(e) =
730						swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
731					{
732						e.clone().into()
733					} else {
734						error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
735						and debug information about {:?}", peer_id);
736						return None
737					};
738
739					Some((
740						peer_id.to_base58(),
741						NetworkStatePeer {
742							endpoint,
743							version_string: swarm
744								.behaviour_mut()
745								.node(peer_id)
746								.and_then(|i| i.client_version().map(|s| s.to_owned())),
747							latest_ping_time: swarm
748								.behaviour_mut()
749								.node(peer_id)
750								.and_then(|i| i.latest_ping()),
751							known_addresses,
752						},
753					))
754				})
755				.collect()
756		};
757
758		let not_connected_peers = {
759			let swarm = &mut *swarm;
760			swarm
761				.behaviour_mut()
762				.known_peers()
763				.into_iter()
764				.filter(|p| open.iter().all(|n| n != p))
765				.map(move |peer_id| {
766					let known_addresses = if let Ok(addrs) =
767						NetworkBehaviour::handle_pending_outbound_connection(
768							swarm.behaviour_mut(),
769							ConnectionId::new_unchecked(0), // dummy value
770							Some(peer_id),
771							&vec![],
772							Endpoint::Listener,
773						) {
774						addrs.into_iter().collect()
775					} else {
776						error!(target: "sub-libp2p", "Was not able to get known addresses for {:?}", peer_id);
777						Default::default()
778					};
779
780					(
781						peer_id.to_base58(),
782						NetworkStateNotConnectedPeer {
783							version_string: swarm
784								.behaviour_mut()
785								.node(&peer_id)
786								.and_then(|i| i.client_version().map(|s| s.to_owned())),
787							latest_ping_time: swarm
788								.behaviour_mut()
789								.node(&peer_id)
790								.and_then(|i| i.latest_ping()),
791							known_addresses,
792						},
793					)
794				})
795				.collect()
796		};
797
798		let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
799		let listened_addresses = swarm.listeners().cloned().collect();
800		let external_addresses = swarm.external_addresses().cloned().collect();
801
802		NetworkState {
803			peer_id,
804			listened_addresses,
805			external_addresses,
806			connected_peers,
807			not_connected_peers,
808			// TODO: Check what info we can include here.
809			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
810			peerset: serde_json::json!(
811				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
812			),
813		}
814	}
815
816	/// Removes a `PeerId` from the list of reserved peers.
817	pub fn remove_reserved_peer(&self, peer: PeerId) {
818		self.service.remove_reserved_peer(peer.into());
819	}
820
821	/// Adds a `PeerId` and its `Multiaddr` as reserved.
822	pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
823		self.service.add_reserved_peer(peer)
824	}
825}
826
827impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
828	/// Get network state.
829	///
830	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
831	/// everywhere about this. Please don't use this function to retrieve actual information.
832	///
833	/// Returns an error if the `NetworkWorker` is no longer running.
834	pub async fn network_state(&self) -> Result<NetworkState, ()> {
835		let (tx, rx) = oneshot::channel();
836
837		let _ = self
838			.to_worker
839			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
840
841		match rx.await {
842			Ok(v) => v.map_err(|_| ()),
843			// The channel can only be closed if the network worker no longer exists.
844			Err(_) => Err(()),
845		}
846	}
847
848	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
849	///
850	/// Returns an `Err` if one of the given addresses is invalid or contains an
851	/// invalid peer ID (which includes the local peer ID).
852	fn split_multiaddr_and_peer_id(
853		&self,
854		peers: HashSet<Multiaddr>,
855	) -> Result<Vec<(PeerId, Multiaddr)>, String> {
856		peers
857			.into_iter()
858			.map(|mut addr| {
859				let peer = match addr.pop() {
860					Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
861					_ => return Err("Missing PeerId from address".to_string()),
862				};
863
864				// Make sure the local peer ID is never added to the PSM
865				// or added as a "known address", even if given.
866				if peer == self.local_peer_id {
867					Err("Local peer ID in peer set.".to_string())
868				} else {
869					Ok((peer, addr))
870				}
871			})
872			.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
873	}
874}
875
876impl<B, H> NetworkStateInfo for NetworkService<B, H>
877where
878	B: sp_runtime::traits::Block,
879	H: ExHashT,
880{
881	/// Returns the local external addresses.
882	fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
883		self.external_addresses.lock().iter().cloned().map(Into::into).collect()
884	}
885
886	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
887	fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
888		self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
889	}
890
891	/// Returns the local Peer ID.
892	fn local_peer_id(&self) -> sc_network_types::PeerId {
893		self.local_peer_id.into()
894	}
895}
896
897impl<B, H> NetworkSigner for NetworkService<B, H>
898where
899	B: sp_runtime::traits::Block,
900	H: ExHashT,
901{
902	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
903		let public_key = self.local_identity.public();
904		let bytes = self.local_identity.sign(msg.as_ref())?;
905
906		Ok(Signature {
907			public_key: crate::service::signature::PublicKey::Libp2p(public_key),
908			bytes,
909		})
910	}
911
912	fn verify(
913		&self,
914		peer_id: sc_network_types::PeerId,
915		public_key: &Vec<u8>,
916		signature: &Vec<u8>,
917		message: &Vec<u8>,
918	) -> Result<bool, String> {
919		let public_key =
920			PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
921		let peer_id: PeerId = peer_id.into();
922		let remote: libp2p::PeerId = public_key.to_peer_id();
923
924		Ok(peer_id == remote && public_key.verify(message, signature))
925	}
926}
927
928impl<B, H> NetworkDHTProvider for NetworkService<B, H>
929where
930	B: BlockT + 'static,
931	H: ExHashT,
932{
933	/// Start getting a value from the DHT.
934	///
935	/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an
936	/// item on the [`NetworkWorker`] stream.
937	fn get_value(&self, key: &KademliaKey) {
938		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
939	}
940
941	/// Start putting a value in the DHT.
942	///
943	/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an
944	/// item on the [`NetworkWorker`] stream.
945	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
946		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
947	}
948
949	fn put_record_to(
950		&self,
951		record: Record,
952		peers: HashSet<sc_network_types::PeerId>,
953		update_local_storage: bool,
954	) {
955		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
956			record,
957			peers,
958			update_local_storage,
959		});
960	}
961
962	fn store_record(
963		&self,
964		key: KademliaKey,
965		value: Vec<u8>,
966		publisher: Option<sc_network_types::PeerId>,
967		expires: Option<Instant>,
968	) {
969		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
970			key,
971			value,
972			publisher.map(Into::into),
973			expires,
974		));
975	}
976}
977
978#[async_trait::async_trait]
979impl<B, H> NetworkStatusProvider for NetworkService<B, H>
980where
981	B: BlockT + 'static,
982	H: ExHashT,
983{
984	async fn status(&self) -> Result<NetworkStatus, ()> {
985		let (tx, rx) = oneshot::channel();
986
987		let _ = self
988			.to_worker
989			.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
990
991		match rx.await {
992			Ok(v) => v.map_err(|_| ()),
993			// The channel can only be closed if the network worker no longer exists.
994			Err(_) => Err(()),
995		}
996	}
997
998	async fn network_state(&self) -> Result<NetworkState, ()> {
999		let (tx, rx) = oneshot::channel();
1000
1001		let _ = self
1002			.to_worker
1003			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
1004
1005		match rx.await {
1006			Ok(v) => v.map_err(|_| ()),
1007			// The channel can only be closed if the network worker no longer exists.
1008			Err(_) => Err(()),
1009		}
1010	}
1011}
1012
1013#[async_trait::async_trait]
1014impl<B, H> NetworkPeers for NetworkService<B, H>
1015where
1016	B: BlockT + 'static,
1017	H: ExHashT,
1018{
1019	fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1020		self.sync_protocol_handle
1021			.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1022	}
1023
1024	fn set_authorized_only(&self, reserved_only: bool) {
1025		self.sync_protocol_handle.set_reserved_only(reserved_only);
1026	}
1027
1028	fn add_known_address(
1029		&self,
1030		peer_id: sc_network_types::PeerId,
1031		addr: sc_network_types::multiaddr::Multiaddr,
1032	) {
1033		let _ = self
1034			.to_worker
1035			.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1036	}
1037
1038	fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1039		self.peer_store_handle.report_peer(peer_id, cost_benefit);
1040	}
1041
1042	fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1043		self.peer_store_handle.peer_reputation(peer_id)
1044	}
1045
1046	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1047		let _ = self
1048			.to_worker
1049			.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1050	}
1051
1052	fn accept_unreserved_peers(&self) {
1053		self.sync_protocol_handle.set_reserved_only(false);
1054	}
1055
1056	fn deny_unreserved_peers(&self) {
1057		self.sync_protocol_handle.set_reserved_only(true);
1058	}
1059
1060	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1061		// Make sure the local peer ID is never added as a reserved peer.
1062		if peer.peer_id == self.local_peer_id.into() {
1063			return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1064		}
1065
1066		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1067			peer.peer_id.into(),
1068			peer.multiaddr.into(),
1069		));
1070		self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1071
1072		Ok(())
1073	}
1074
1075	fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1076		self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1077	}
1078
1079	fn set_reserved_peers(
1080		&self,
1081		protocol: ProtocolName,
1082		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1083	) -> Result<(), String> {
1084		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1085			return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol))
1086		};
1087
1088		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1089		let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1090
1091		let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1092
1093		for (peer_id, addr) in peers_addrs.into_iter() {
1094			// Make sure the local peer ID is never added to the PSM.
1095			if peer_id == self.local_peer_id {
1096				return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1097			}
1098
1099			peers.insert(peer_id.into());
1100
1101			if !addr.is_empty() {
1102				let _ = self
1103					.to_worker
1104					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1105			}
1106		}
1107
1108		self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1109
1110		Ok(())
1111	}
1112
1113	fn add_peers_to_reserved_set(
1114		&self,
1115		protocol: ProtocolName,
1116		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1117	) -> Result<(), String> {
1118		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1119			return Err(format!(
1120				"Cannot add peers to reserved set of unknown protocol: {}",
1121				protocol
1122			))
1123		};
1124
1125		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1126		let peers = self.split_multiaddr_and_peer_id(peers)?;
1127
1128		for (peer_id, addr) in peers.into_iter() {
1129			// Make sure the local peer ID is never added to the PSM.
1130			if peer_id == self.local_peer_id {
1131				return Err("Local peer ID cannot be added as a reserved peer.".to_string())
1132			}
1133
1134			if !addr.is_empty() {
1135				let _ = self
1136					.to_worker
1137					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1138			}
1139
1140			self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1141		}
1142
1143		Ok(())
1144	}
1145
1146	fn remove_peers_from_reserved_set(
1147		&self,
1148		protocol: ProtocolName,
1149		peers: Vec<sc_network_types::PeerId>,
1150	) -> Result<(), String> {
1151		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1152			return Err(format!(
1153				"Cannot remove peers from reserved set of unknown protocol: {}",
1154				protocol
1155			))
1156		};
1157
1158		for peer_id in peers.into_iter() {
1159			self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1160		}
1161
1162		Ok(())
1163	}
1164
1165	fn sync_num_connected(&self) -> usize {
1166		self.num_connected.load(Ordering::Relaxed)
1167	}
1168
1169	fn peer_role(
1170		&self,
1171		peer_id: sc_network_types::PeerId,
1172		handshake: Vec<u8>,
1173	) -> Option<ObservedRole> {
1174		match Roles::decode_all(&mut &handshake[..]) {
1175			Ok(role) => Some(role.into()),
1176			Err(_) => {
1177				log::debug!(target: "sub-libp2p", "handshake doesn't contain peer role: {handshake:?}");
1178				self.peer_store_handle.peer_role(&(peer_id.into()))
1179			},
1180		}
1181	}
1182
1183	/// Get the list of reserved peers.
1184	///
1185	/// Returns an error if the `NetworkWorker` is no longer running.
1186	async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1187		let (tx, rx) = oneshot::channel();
1188
1189		self.sync_protocol_handle.reserved_peers(tx);
1190
1191		// The channel can only be closed if `ProtocolController` no longer exists.
1192		rx.await
1193			.map(|peers| peers.into_iter().map(From::from).collect())
1194			.map_err(|_| ())
1195	}
1196}
1197
1198impl<B, H> NetworkEventStream for NetworkService<B, H>
1199where
1200	B: BlockT + 'static,
1201	H: ExHashT,
1202{
1203	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1204		let (tx, rx) = out_events::channel(name, 100_000);
1205		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1206		Box::pin(rx)
1207	}
1208}
1209
1210#[async_trait::async_trait]
1211impl<B, H> NetworkRequest for NetworkService<B, H>
1212where
1213	B: BlockT + 'static,
1214	H: ExHashT,
1215{
1216	async fn request(
1217		&self,
1218		target: sc_network_types::PeerId,
1219		protocol: ProtocolName,
1220		request: Vec<u8>,
1221		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1222		connect: IfDisconnected,
1223	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1224		let (tx, rx) = oneshot::channel();
1225
1226		self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1227
1228		match rx.await {
1229			Ok(v) => v,
1230			// The channel can only be closed if the network worker no longer exists. If the
1231			// network worker no longer exists, then all connections to `target` are necessarily
1232			// closed, and we legitimately report this situation as a "ConnectionClosed".
1233			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1234		}
1235	}
1236
1237	fn start_request(
1238		&self,
1239		target: sc_network_types::PeerId,
1240		protocol: ProtocolName,
1241		request: Vec<u8>,
1242		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1243		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1244		connect: IfDisconnected,
1245	) {
1246		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1247			target: target.into(),
1248			protocol: protocol.into(),
1249			request,
1250			fallback_request,
1251			pending_response: tx,
1252			connect,
1253		});
1254	}
1255}
1256
1257/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
1258#[must_use]
1259pub struct NotificationSender {
1260	sink: NotificationsSink,
1261
1262	/// Name of the protocol on the wire.
1263	protocol_name: ProtocolName,
1264
1265	/// Field extracted from the [`Metrics`] struct and necessary to report the
1266	/// notifications-related metrics.
1267	notification_size_metric: Option<Histogram>,
1268}
1269
1270#[async_trait::async_trait]
1271impl NotificationSenderT for NotificationSender {
1272	async fn ready(
1273		&self,
1274	) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1275		Ok(Box::new(NotificationSenderReady {
1276			ready: match self.sink.reserve_notification().await {
1277				Ok(r) => Some(r),
1278				Err(()) => return Err(NotificationSenderError::Closed),
1279			},
1280			peer_id: self.sink.peer_id(),
1281			protocol_name: &self.protocol_name,
1282			notification_size_metric: self.notification_size_metric.clone(),
1283		}))
1284	}
1285}
1286
1287/// Reserved slot in the notifications buffer, ready to accept data.
1288#[must_use]
1289pub struct NotificationSenderReady<'a> {
1290	ready: Option<Ready<'a>>,
1291
1292	/// Target of the notification.
1293	peer_id: &'a PeerId,
1294
1295	/// Name of the protocol on the wire.
1296	protocol_name: &'a ProtocolName,
1297
1298	/// Field extracted from the [`Metrics`] struct and necessary to report the
1299	/// notifications-related metrics.
1300	notification_size_metric: Option<Histogram>,
1301}
1302
1303impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1304	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1305		if let Some(notification_size_metric) = &self.notification_size_metric {
1306			notification_size_metric.observe(notification.len() as f64);
1307		}
1308
1309		trace!(
1310			target: "sub-libp2p",
1311			"External API => Notification({:?}, {}, {} bytes)",
1312			self.peer_id, self.protocol_name, notification.len(),
1313		);
1314		trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id);
1315
1316		self.ready
1317			.take()
1318			.ok_or(NotificationSenderError::Closed)?
1319			.send(notification)
1320			.map_err(|()| NotificationSenderError::Closed)
1321	}
1322}
1323
1324/// Messages sent from the `NetworkService` to the `NetworkWorker`.
1325///
1326/// Each entry corresponds to a method of `NetworkService`.
1327enum ServiceToWorkerMsg {
1328	GetValue(KademliaKey),
1329	PutValue(KademliaKey, Vec<u8>),
1330	PutRecordTo {
1331		record: Record,
1332		peers: HashSet<sc_network_types::PeerId>,
1333		update_local_storage: bool,
1334	},
1335	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1336	AddKnownAddress(PeerId, Multiaddr),
1337	EventStream(out_events::Sender),
1338	Request {
1339		target: PeerId,
1340		protocol: ProtocolName,
1341		request: Vec<u8>,
1342		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1343		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1344		connect: IfDisconnected,
1345	},
1346	NetworkStatus {
1347		pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1348	},
1349	NetworkState {
1350		pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1351	},
1352	DisconnectPeer(PeerId, ProtocolName),
1353}
1354
1355/// Main network worker. Must be polled in order for the network to advance.
1356///
1357/// You are encouraged to poll this in a separate background thread or task.
1358#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1359pub struct NetworkWorker<B, H>
1360where
1361	B: BlockT + 'static,
1362	H: ExHashT,
1363{
1364	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1365	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1366	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1367	num_connected: Arc<AtomicUsize>,
1368	/// The network service that can be extracted and shared through the codebase.
1369	service: Arc<NetworkService<B, H>>,
1370	/// The *actual* network.
1371	network_service: Swarm<Behaviour<B>>,
1372	/// Messages from the [`NetworkService`] that must be processed.
1373	from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1374	/// Senders for events that happen on the network.
1375	event_streams: out_events::OutChannels,
1376	/// Prometheus network metrics.
1377	metrics: Option<Metrics>,
1378	/// The `PeerId`'s of all boot nodes mapped to the registered addresses.
1379	boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1380	/// Boot nodes that we already have reported as invalid.
1381	reported_invalid_boot_nodes: HashSet<PeerId>,
1382	/// Peer reputation store handle.
1383	peer_store_handle: Arc<dyn PeerStoreProvider>,
1384	/// Notification protocol handles.
1385	notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1386	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
1387	/// compatibility.
1388	_marker: PhantomData<H>,
1389	/// Marker for block type
1390	_block: PhantomData<B>,
1391}
1392
1393impl<B, H> NetworkWorker<B, H>
1394where
1395	B: BlockT + 'static,
1396	H: ExHashT,
1397{
1398	/// Run the network.
1399	pub async fn run(mut self) {
1400		while self.next_action().await {}
1401	}
1402
1403	/// Perform one action on the network.
1404	///
1405	/// Returns `false` when the worker should be shutdown.
1406	/// Use in tests only.
1407	pub async fn next_action(&mut self) -> bool {
1408		futures::select! {
1409			// Next message from the service.
1410			msg = self.from_service.next() => {
1411				if let Some(msg) = msg {
1412					self.handle_worker_message(msg);
1413				} else {
1414					return false
1415				}
1416			},
1417			// Next event from `Swarm` (the stream guaranteed to never terminate).
1418			event = self.network_service.select_next_some() => {
1419				self.handle_swarm_event(event);
1420			},
1421		};
1422
1423		// Update the `num_connected` count shared with the `NetworkService`.
1424		let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1425		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1426
1427		if let Some(metrics) = self.metrics.as_ref() {
1428			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1429				for (lower_ilog2_bucket_bound, num_entries) in buckets {
1430					metrics
1431						.kbuckets_num_nodes
1432						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1433						.set(num_entries as u64);
1434				}
1435			}
1436			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1437				metrics.kademlia_records_count.set(num_entries as u64);
1438			}
1439			if let Some(num_entries) =
1440				self.network_service.behaviour_mut().kademlia_records_total_size()
1441			{
1442				metrics.kademlia_records_sizes_total.set(num_entries as u64);
1443			}
1444
1445			metrics.pending_connections.set(
1446				Swarm::network_info(&self.network_service).connection_counters().num_pending()
1447					as u64,
1448			);
1449		}
1450
1451		true
1452	}
1453
1454	/// Process the next message coming from the `NetworkService`.
1455	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1456		match msg {
1457			ServiceToWorkerMsg::GetValue(key) =>
1458				self.network_service.behaviour_mut().get_value(key),
1459			ServiceToWorkerMsg::PutValue(key, value) =>
1460				self.network_service.behaviour_mut().put_value(key, value),
1461			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1462				.network_service
1463				.behaviour_mut()
1464				.put_record_to(record, peers, update_local_storage),
1465			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1466				.network_service
1467				.behaviour_mut()
1468				.store_record(key, value, publisher, expires),
1469			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
1470				self.network_service.behaviour_mut().add_known_address(peer_id, addr),
1471			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1472			ServiceToWorkerMsg::Request {
1473				target,
1474				protocol,
1475				request,
1476				fallback_request,
1477				pending_response,
1478				connect,
1479			} => {
1480				self.network_service.behaviour_mut().send_request(
1481					&target,
1482					protocol,
1483					request,
1484					fallback_request,
1485					pending_response,
1486					connect,
1487				);
1488			},
1489			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1490				let _ = pending_response.send(Ok(self.status()));
1491			},
1492			ServiceToWorkerMsg::NetworkState { pending_response } => {
1493				let _ = pending_response.send(Ok(self.network_state()));
1494			},
1495			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1496				.network_service
1497				.behaviour_mut()
1498				.user_protocol_mut()
1499				.disconnect_peer(&who, protocol_name),
1500		}
1501	}
1502
1503	/// Process the next event coming from `Swarm`.
1504	#[allow(deprecated)]
1505	fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut, THandlerErr<Behaviour<B>>>) {
1506		match event {
1507			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1508				if let Some(metrics) = self.metrics.as_ref() {
1509					match result {
1510						Ok(serve_time) => {
1511							metrics
1512								.requests_in_success_total
1513								.with_label_values(&[&protocol])
1514								.observe(serve_time.as_secs_f64());
1515						},
1516						Err(err) => {
1517							let reason = match err {
1518								ResponseFailure::Network(InboundFailure::Timeout) =>
1519									Some("timeout"),
1520								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1521								// `UnsupportedProtocols` is reported for every single
1522								// inbound request whenever a request with an unsupported
1523								// protocol is received. This is not reported in order to
1524								// avoid confusions.
1525									None,
1526								ResponseFailure::Network(InboundFailure::ResponseOmission) =>
1527									Some("busy-omitted"),
1528								ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
1529									Some("connection-closed"),
1530							};
1531
1532							if let Some(reason) = reason {
1533								metrics
1534									.requests_in_failure_total
1535									.with_label_values(&[&protocol, reason])
1536									.inc();
1537							}
1538						},
1539					}
1540				}
1541			},
1542			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1543				protocol,
1544				duration,
1545				result,
1546				..
1547			}) =>
1548				if let Some(metrics) = self.metrics.as_ref() {
1549					match result {
1550						Ok(_) => {
1551							metrics
1552								.requests_out_success_total
1553								.with_label_values(&[&protocol])
1554								.observe(duration.as_secs_f64());
1555						},
1556						Err(err) => {
1557							let reason = match err {
1558								RequestFailure::NotConnected => "not-connected",
1559								RequestFailure::UnknownProtocol => "unknown-protocol",
1560								RequestFailure::Refused => "refused",
1561								RequestFailure::Obsolete => "obsolete",
1562								RequestFailure::Network(OutboundFailure::DialFailure) =>
1563									"dial-failure",
1564								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1565								RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
1566									"connection-closed",
1567								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
1568									"unsupported",
1569							};
1570
1571							metrics
1572								.requests_out_failure_total
1573								.with_label_values(&[&protocol, reason])
1574								.inc();
1575						},
1576					}
1577				},
1578			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1579				for change in changes {
1580					self.peer_store_handle.report_peer(peer.into(), change);
1581				}
1582			},
1583			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1584				peer_id,
1585				info:
1586					IdentifyInfo {
1587						protocol_version, agent_version, mut listen_addrs, protocols, ..
1588					},
1589			}) => {
1590				if listen_addrs.len() > 30 {
1591					debug!(
1592						target: "sub-libp2p",
1593						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1594						peer_id, protocol_version, agent_version
1595					);
1596					listen_addrs.truncate(30);
1597				}
1598				for addr in listen_addrs {
1599					self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1600						&peer_id,
1601						&protocols,
1602						addr.clone(),
1603					);
1604				}
1605				self.peer_store_handle.add_known_peer(peer_id.into());
1606			},
1607			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1608				self.peer_store_handle.add_known_peer(peer_id.into());
1609			},
1610			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1611				if let Some(metrics) = self.metrics.as_ref() {
1612					metrics.kademlia_random_queries_total.inc();
1613				}
1614			},
1615			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1616				remote,
1617				set_id,
1618				direction,
1619				negotiated_fallback,
1620				notifications_sink,
1621				received_handshake,
1622			}) => {
1623				let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1624					remote,
1625					direction,
1626					received_handshake,
1627					negotiated_fallback,
1628					notifications_sink,
1629				);
1630			},
1631			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1632				remote,
1633				set_id,
1634				notifications_sink,
1635			}) => {
1636				let _ = self.notif_protocol_handles[usize::from(set_id)]
1637					.report_notification_sink_replaced(remote, notifications_sink);
1638
1639				// TODO: Notifications might have been lost as a result of the previous
1640				// connection being dropped, and as a result it would be preferable to notify
1641				// the users of this fact by simulating the substream being closed then
1642				// reopened.
1643				// The code below doesn't compile because `role` is unknown. Propagating the
1644				// handshake of the secondary connections is quite an invasive change and
1645				// would conflict with https://github.com/paritytech/substrate/issues/6403.
1646				// Considering that dropping notifications is generally regarded as
1647				// acceptable, this bug is at the moment intentionally left there and is
1648				// intended to be fixed at the same time as
1649				// https://github.com/paritytech/substrate/issues/6403.
1650				// self.event_streams.send(Event::NotificationStreamClosed {
1651				// remote,
1652				// protocol,
1653				// });
1654				// self.event_streams.send(Event::NotificationStreamOpened {
1655				// remote,
1656				// protocol,
1657				// role,
1658				// });
1659			},
1660			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1661				let _ = self.notif_protocol_handles[usize::from(set_id)]
1662					.report_substream_closed(remote);
1663			},
1664			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1665				remote,
1666				set_id,
1667				notification,
1668			}) => {
1669				let _ = self.notif_protocol_handles[usize::from(set_id)]
1670					.report_notification_received(remote, notification);
1671			},
1672			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1673				match (self.metrics.as_ref(), duration) {
1674					(Some(metrics), Some(duration)) => {
1675						let query_type = match event {
1676							DhtEvent::ValueFound(_) => "value-found",
1677							DhtEvent::ValueNotFound(_) => "value-not-found",
1678							DhtEvent::ValuePut(_) => "value-put",
1679							DhtEvent::ValuePutFailed(_) => "value-put-failed",
1680							DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1681						};
1682						metrics
1683							.kademlia_query_duration
1684							.with_label_values(&[query_type])
1685							.observe(duration.as_secs_f64());
1686					},
1687					_ => {},
1688				}
1689
1690				self.event_streams.send(Event::Dht(event));
1691			},
1692			SwarmEvent::Behaviour(BehaviourOut::None) => {
1693				// Ignored event from lower layers.
1694			},
1695			SwarmEvent::ConnectionEstablished {
1696				peer_id,
1697				endpoint,
1698				num_established,
1699				concurrent_dial_errors,
1700				..
1701			} => {
1702				if let Some(errors) = concurrent_dial_errors {
1703					debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1704				} else {
1705					debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
1706				}
1707
1708				if let Some(metrics) = self.metrics.as_ref() {
1709					let direction = match endpoint {
1710						ConnectedPoint::Dialer { .. } => "out",
1711						ConnectedPoint::Listener { .. } => "in",
1712					};
1713					metrics.connections_opened_total.with_label_values(&[direction]).inc();
1714
1715					if num_established.get() == 1 {
1716						metrics.distinct_peers_connections_opened_total.inc();
1717					}
1718				}
1719			},
1720			SwarmEvent::ConnectionClosed {
1721				connection_id,
1722				peer_id,
1723				cause,
1724				endpoint,
1725				num_established,
1726			} => {
1727				debug!(target: "sub-libp2p", "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1728				if let Some(metrics) = self.metrics.as_ref() {
1729					let direction = match endpoint {
1730						ConnectedPoint::Dialer { .. } => "out",
1731						ConnectedPoint::Listener { .. } => "in",
1732					};
1733					let reason = match cause {
1734						Some(ConnectionError::IO(_)) => "transport-error",
1735						Some(ConnectionError::Handler(Either::Left(Either::Left(
1736							Either::Left(Either::Right(
1737								NotifsHandlerError::SyncNotificationsClogged,
1738							)),
1739						)))) => "sync-notifications-clogged",
1740						Some(ConnectionError::Handler(Either::Left(Either::Left(
1741							Either::Right(Either::Left(_)),
1742						)))) => "ping-timeout",
1743						Some(ConnectionError::Handler(_)) => "protocol-error",
1744						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1745						None => "actively-closed",
1746					};
1747					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1748
1749					// `num_established` represents the number of *remaining* connections.
1750					if num_established == 0 {
1751						metrics.distinct_peers_connections_closed_total.inc();
1752					}
1753				}
1754			},
1755			SwarmEvent::NewListenAddr { address, .. } => {
1756				trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address);
1757				if let Some(metrics) = self.metrics.as_ref() {
1758					metrics.listeners_local_addresses.inc();
1759				}
1760				self.listen_addresses.lock().insert(address.clone());
1761			},
1762			SwarmEvent::ExpiredListenAddr { address, .. } => {
1763				info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
1764				if let Some(metrics) = self.metrics.as_ref() {
1765					metrics.listeners_local_addresses.dec();
1766				}
1767				self.listen_addresses.lock().remove(&address);
1768			},
1769			SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1770				if let Some(peer_id) = peer_id {
1771					trace!(
1772						target: "sub-libp2p",
1773						"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1774					);
1775
1776					let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1777
1778					if let Some(addresses) =
1779						not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1780					{
1781						if let DialError::WrongPeerId { obtained, endpoint } = &error {
1782							if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint {
1783								let address_without_peer_id = parse_addr(address.clone().into())
1784									.map_or_else(|_| address.clone(), |r| r.1.into());
1785
1786								// Only report for address of boot node that was added at startup of
1787								// the node and not for any address that the node learned of the
1788								// boot node.
1789								if addresses.iter().any(|a| address_without_peer_id == *a) {
1790									warn!(
1791										"💔 The bootnode you want to connect to at `{address}` provided a \
1792										 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1793									);
1794
1795									self.reported_invalid_boot_nodes.insert(peer_id);
1796								}
1797							}
1798						}
1799					}
1800				}
1801
1802				if let Some(metrics) = self.metrics.as_ref() {
1803					#[allow(deprecated)]
1804					let reason = match error {
1805						DialError::Denied { cause } =>
1806							if cause.downcast::<Exceeded>().is_ok() {
1807								Some("limit-reached")
1808							} else {
1809								None
1810							},
1811						DialError::LocalPeerId { .. } => Some("local-peer-id"),
1812						DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1813						DialError::Transport(_) => Some("transport-error"),
1814						DialError::NoAddresses |
1815						DialError::DialPeerConditionFalse(_) |
1816						DialError::Aborted => None, // ignore them
1817					};
1818					if let Some(reason) = reason {
1819						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1820					}
1821				}
1822			},
1823			SwarmEvent::Dialing { connection_id, peer_id } => {
1824				trace!(target: "sub-libp2p", "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1825			},
1826			SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1827				trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1828				if let Some(metrics) = self.metrics.as_ref() {
1829					metrics.incoming_connections_total.inc();
1830				}
1831			},
1832			SwarmEvent::IncomingConnectionError {
1833				connection_id,
1834				local_addr,
1835				send_back_addr,
1836				error,
1837			} => {
1838				debug!(
1839					target: "sub-libp2p",
1840					"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1841				);
1842				if let Some(metrics) = self.metrics.as_ref() {
1843					#[allow(deprecated)]
1844					let reason = match error {
1845						ListenError::Denied { cause } =>
1846							if cause.downcast::<Exceeded>().is_ok() {
1847								Some("limit-reached")
1848							} else {
1849								None
1850							},
1851						ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } =>
1852							Some("invalid-peer-id"),
1853						ListenError::Transport(_) => Some("transport-error"),
1854						ListenError::Aborted => None, // ignore it
1855					};
1856
1857					if let Some(reason) = reason {
1858						metrics
1859							.incoming_connections_errors_total
1860							.with_label_values(&[reason])
1861							.inc();
1862					}
1863				}
1864			},
1865			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1866				if let Some(metrics) = self.metrics.as_ref() {
1867					metrics.listeners_local_addresses.sub(addresses.len() as u64);
1868				}
1869				let mut listen_addresses = self.listen_addresses.lock();
1870				for addr in &addresses {
1871					listen_addresses.remove(addr);
1872				}
1873				drop(listen_addresses);
1874
1875				let addrs =
1876					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1877				match reason {
1878					Ok(()) => error!(
1879						target: "sub-libp2p",
1880						"📪 Libp2p listener ({}) closed gracefully",
1881						addrs
1882					),
1883					Err(e) => error!(
1884						target: "sub-libp2p",
1885						"📪 Libp2p listener ({}) closed: {}",
1886						addrs, e
1887					),
1888				}
1889			},
1890			SwarmEvent::ListenerError { error, .. } => {
1891				debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
1892				if let Some(metrics) = self.metrics.as_ref() {
1893					metrics.listeners_errors_total.inc();
1894				}
1895			},
1896		}
1897	}
1898}
1899
1900impl<B, H> Unpin for NetworkWorker<B, H>
1901where
1902	B: BlockT + 'static,
1903	H: ExHashT,
1904{
1905}
1906
1907pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1908	addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1909	transport: &TransportConfig,
1910) -> Result<(), Error> {
1911	use sc_network_types::multiaddr::Protocol;
1912
1913	if matches!(transport, TransportConfig::MemoryOnly) {
1914		let addresses: Vec<_> = addresses
1915			.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1916			.cloned()
1917			.collect();
1918
1919		if !addresses.is_empty() {
1920			return Err(Error::AddressesForAnotherTransport {
1921				transport: transport.clone(),
1922				addresses,
1923			})
1924		}
1925	} else {
1926		let addresses: Vec<_> = addresses
1927			.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1928			.cloned()
1929			.collect();
1930
1931		if !addresses.is_empty() {
1932			return Err(Error::AddressesForAnotherTransport {
1933				transport: transport.clone(),
1934				addresses,
1935			})
1936		}
1937	}
1938
1939	Ok(())
1940}