sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	service::{
43		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44		out_events,
45		traits::{BandwidthSink, NetworkBackend, NetworkService},
46	},
47	NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
53use litep2p::{
54	config::ConfigBuilder,
55	crypto::ed25519::Keypair,
56	error::{DialError, NegotiationError},
57	executor::Executor,
58	protocol::{
59		libp2p::{
60			bitswap::Config as BitswapConfig,
61			kademlia::{QueryId, Record},
62		},
63		request_response::ConfigBuilder as RequestResponseConfigBuilder,
64	},
65	transport::{
66		tcp::config::Config as TcpTransportConfig,
67		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
68	},
69	types::{
70		multiaddr::{Multiaddr, Protocol},
71		ConnectionId,
72	},
73	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
74};
75use prometheus_endpoint::Registry;
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84	cmp,
85	collections::{hash_map::Entry, HashMap, HashSet},
86	fs,
87	future::Future,
88	iter,
89	pin::Pin,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Timeout for connection waiting new substreams.
103const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
104
105/// Litep2p bandwidth sink.
106struct Litep2pBandwidthSink {
107	sink: litep2p::BandwidthSink,
108}
109
110impl BandwidthSink for Litep2pBandwidthSink {
111	fn total_inbound(&self) -> u64 {
112		self.sink.inbound() as u64
113	}
114
115	fn total_outbound(&self) -> u64 {
116		self.sink.outbound() as u64
117	}
118}
119
120/// Litep2p task executor.
121struct Litep2pExecutor {
122	/// Executor.
123	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
124}
125
126impl Executor for Litep2pExecutor {
127	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
128		(self.executor)(future)
129	}
130
131	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
132		(self.executor)(future)
133	}
134}
135
136/// Logging target for the file.
137const LOG_TARGET: &str = "sub-libp2p";
138
139/// Peer context.
140struct ConnectionContext {
141	/// Peer endpoints.
142	endpoints: HashMap<ConnectionId, Endpoint>,
143
144	/// Number of active connections.
145	num_connections: usize,
146}
147
148/// Networking backend for `litep2p`.
149pub struct Litep2pNetworkBackend {
150	/// Main `litep2p` object.
151	litep2p: Litep2p,
152
153	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
154	network_service: Arc<dyn NetworkService>,
155
156	/// RX channel for receiving commands from `Litep2pNetworkService`.
157	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
158
159	/// `Peerset` handles to notification protocols.
160	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
161
162	/// Pending `GET_VALUE` queries.
163	pending_get_values: HashMap<QueryId, (RecordKey, Instant)>,
164
165	/// Pending `PUT_VALUE` queries.
166	pending_put_values: HashMap<QueryId, (RecordKey, Instant)>,
167
168	/// Discovery.
169	discovery: Discovery,
170
171	/// Number of connected peers.
172	num_connected: Arc<AtomicUsize>,
173
174	/// Connected peers.
175	peers: HashMap<litep2p::PeerId, ConnectionContext>,
176
177	/// Peerstore.
178	peerstore_handle: Arc<dyn PeerStoreProvider>,
179
180	/// Block announce protocol name.
181	block_announce_protocol: ProtocolName,
182
183	/// Sender for DHT events.
184	event_streams: out_events::OutChannels,
185
186	/// Prometheus metrics.
187	metrics: Option<Metrics>,
188}
189
190impl Litep2pNetworkBackend {
191	/// From an iterator of multiaddress(es), parse and group all addresses of peers
192	/// so that litep2p can consume the information easily.
193	fn parse_addresses(
194		addresses: impl Iterator<Item = Multiaddr>,
195	) -> HashMap<PeerId, Vec<Multiaddr>> {
196		addresses
197			.into_iter()
198			.filter_map(|address| match address.iter().next() {
199				Some(
200					Protocol::Dns(_) |
201					Protocol::Dns4(_) |
202					Protocol::Dns6(_) |
203					Protocol::Ip6(_) |
204					Protocol::Ip4(_),
205				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
206				{
207					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
208						.map_or(None, |peer| Some((peer, Some(address)))),
209					_ => None,
210				},
211				Some(Protocol::P2p(multihash)) =>
212					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
213				_ => None,
214			})
215			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
216				let entry = acc.entry(peer).or_default();
217				maybe_address.map(|address| entry.push(address));
218
219				acc
220			})
221	}
222
223	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
224	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
225		Self::parse_addresses(peers.into_iter())
226			.into_iter()
227			.filter_map(|(peer, addresses)| {
228				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
229				if addresses.is_empty() {
230					return Some(peer)
231				}
232
233				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
234					log::warn!(
235						target: LOG_TARGET,
236						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
237					);
238					return None
239				}
240
241				self.peerstore_handle.add_known_peer(peer);
242				Some(peer)
243			})
244			.collect()
245	}
246}
247
248impl Litep2pNetworkBackend {
249	/// Get `litep2p` keypair from `NodeKeyConfig`.
250	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
251		let secret: litep2p::crypto::ed25519::SecretKey =
252			node_key.clone().into_keypair()?.secret().into();
253
254		let local_identity = Keypair::from(secret);
255		let local_public = local_identity.public();
256		let local_peer_id = local_public.to_peer_id();
257
258		Ok((local_identity, local_peer_id))
259	}
260
261	/// Configure transport protocols for `Litep2pNetworkBackend`.
262	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
263		config: &FullNetworkConfiguration<B, H, Self>,
264	) -> ConfigBuilder {
265		let _ = match config.network_config.transport {
266			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
267			TransportConfig::Normal { .. } => false,
268		};
269		let config_builder = ConfigBuilder::new();
270
271		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
272			.network_config
273			.listen_addresses
274			.iter()
275			.filter_map(|address| {
276				use sc_network_types::multiaddr::Protocol;
277
278				let mut iter = address.iter();
279
280				match iter.next() {
281					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
282					protocol => {
283						log::error!(
284							target: LOG_TARGET,
285							"unknown protocol {protocol:?}, ignoring {address:?}",
286						);
287
288						return None
289					},
290				}
291
292				match iter.next() {
293					Some(Protocol::Tcp(_)) => match iter.next() {
294						Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
295							Some((None, Some(address.clone()))),
296						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
297						protocol => {
298							log::error!(
299								target: LOG_TARGET,
300								"unknown protocol {protocol:?}, ignoring {address:?}",
301							);
302							None
303						},
304					},
305					protocol => {
306						log::error!(
307							target: LOG_TARGET,
308							"unknown protocol {protocol:?}, ignoring {address:?}",
309						);
310						None
311					},
312				}
313			})
314			.unzip();
315
316		config_builder
317			.with_websocket(WebSocketTransportConfig {
318				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
319				yamux_config: litep2p::yamux::Config::default(),
320				nodelay: true,
321				..Default::default()
322			})
323			.with_tcp(TcpTransportConfig {
324				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
325				yamux_config: litep2p::yamux::Config::default(),
326				nodelay: true,
327				..Default::default()
328			})
329	}
330}
331
332#[async_trait::async_trait]
333impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
334	type NotificationProtocolConfig = NotificationProtocolConfig;
335	type RequestResponseProtocolConfig = RequestResponseConfig;
336	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
337	type PeerStore = Peerstore;
338	type BitswapConfig = BitswapConfig;
339
340	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
341	where
342		Self: Sized,
343	{
344		let (keypair, local_peer_id) =
345			Self::get_keypair(&params.network_config.network_config.node_key)?;
346		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
347
348		params.network_config.network_config.boot_nodes = params
349			.network_config
350			.network_config
351			.boot_nodes
352			.into_iter()
353			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
354			.collect();
355		params.network_config.network_config.default_peers_set.reserved_nodes = params
356			.network_config
357			.network_config
358			.default_peers_set
359			.reserved_nodes
360			.into_iter()
361			.filter(|reserved_node| {
362				if reserved_node.peer_id == local_peer_id.into() {
363					log::warn!(
364						target: LOG_TARGET,
365						"Local peer ID used in reserved node, ignoring: {reserved_node}",
366					);
367					false
368				} else {
369					true
370				}
371			})
372			.collect();
373
374		if let Some(path) = &params.network_config.network_config.net_config_path {
375			fs::create_dir_all(path)?;
376		}
377
378		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
379		log::info!(target: LOG_TARGET, "Running litep2p network backend");
380
381		params.network_config.sanity_check_addresses()?;
382		params.network_config.sanity_check_bootnodes()?;
383
384		let mut config_builder =
385			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
386		let known_addresses = params.network_config.known_addresses();
387		let peer_store_handle = params.network_config.peer_store_handle();
388		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
389
390		let FullNetworkConfiguration {
391			notification_protocols,
392			request_response_protocols,
393			network_config,
394			..
395		} = params.network_config;
396
397		// initialize notification protocols
398		//
399		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
400		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
401		// of Polkadot SDK to control connectivity of the notification protocol
402		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
403		let mut notif_protocols = HashMap::from_iter([(
404			params.block_announce_config.protocol_name().clone(),
405			params.block_announce_config.handle,
406		)]);
407
408		// handshake for all but the syncing protocol is set to node role
409		config_builder = notification_protocols
410			.into_iter()
411			.fold(config_builder, |config_builder, mut config| {
412				config.config.set_handshake(Roles::from(&params.role).encode());
413				notif_protocols.insert(config.protocol_name, config.handle);
414
415				config_builder.with_notification_protocol(config.config)
416			})
417			.with_notification_protocol(params.block_announce_config.config);
418
419		// initialize request-response protocols
420		let metrics = match &params.metrics_registry {
421			Some(registry) => Some(register_without_sources(registry)?),
422			None => None,
423		};
424
425		// create channels that are used to send request before initializing protocols so the
426		// senders can be passed onto all request-response protocols
427		//
428		// all protocols must have each others' senders so they can send the fallback request in
429		// case the main protocol is not supported by the remote peer and user specified a fallback
430		let (mut request_response_receivers, request_response_senders): (
431			HashMap<_, _>,
432			HashMap<_, _>,
433		) = request_response_protocols
434			.iter()
435			.map(|config| {
436				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
437				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
438			})
439			.unzip();
440
441		config_builder = request_response_protocols.into_iter().fold(
442			config_builder,
443			|config_builder, config| {
444				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
445					Litep2pProtocolName::from(config.protocol_name.clone()),
446				)
447				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
448				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
449				.with_timeout(config.request_timeout)
450				.build();
451
452				let protocol = RequestResponseProtocol::new(
453					config.protocol_name.clone(),
454					handle,
455					Arc::clone(&peer_store_handle),
456					config.inbound_queue,
457					request_response_receivers
458						.remove(&config.protocol_name)
459						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
460					request_response_senders.clone(),
461					metrics.clone(),
462				);
463
464				executor.run(Box::pin(async move {
465					protocol.run().await;
466				}));
467
468				config_builder.with_request_response_protocol(protocol_config)
469			},
470		);
471
472		// collect known addresses
473		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
474			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
475				use sc_network_types::multiaddr::Protocol;
476
477				let address = match address.iter().last() {
478					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
479						address.with(Protocol::P2p(peer.into())),
480					Some(Protocol::P2p(_)) => address,
481					_ => return acc,
482				};
483
484				acc.entry(peer.into()).or_default().push(address.into());
485				peer_store_handle.add_known_peer(peer);
486
487				acc
488			});
489
490		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
491		let listen_addresses = Arc::new(Default::default());
492		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
493			Discovery::new(
494				local_peer_id,
495				&network_config,
496				params.genesis_hash,
497				params.fork_id.as_deref(),
498				&params.protocol_id,
499				known_addresses.clone(),
500				Arc::clone(&listen_addresses),
501				Arc::clone(&peer_store_handle),
502			);
503
504		config_builder = config_builder
505			.with_known_addresses(known_addresses.clone().into_iter())
506			.with_libp2p_ping(ping_config)
507			.with_libp2p_identify(identify_config)
508			.with_libp2p_kademlia(kademlia_config)
509			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
510				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
511			))
512			// This has the same effect as `libp2p::Swarm::with_idle_connection_timeout` which is
513			// set to 10 seconds as well.
514			.with_keep_alive_timeout(KEEP_ALIVE_TIMEOUT)
515			.with_executor(executor);
516
517		if let Some(config) = maybe_mdns_config {
518			config_builder = config_builder.with_mdns(config);
519		}
520
521		if let Some(config) = params.bitswap_config {
522			config_builder = config_builder.with_libp2p_bitswap(config);
523		}
524
525		let litep2p =
526			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
527
528		litep2p.listen_addresses().for_each(|address| {
529			log::debug!(target: LOG_TARGET, "listening on: {address}");
530
531			listen_addresses.write().insert(address.clone());
532		});
533
534		let public_addresses = litep2p.public_addresses();
535		for address in network_config.public_addresses.iter() {
536			if let Err(err) = public_addresses.add_address(address.clone().into()) {
537				log::warn!(
538					target: LOG_TARGET,
539					"failed to add public address {address:?}: {err:?}",
540				);
541			}
542		}
543
544		let network_service = Arc::new(Litep2pNetworkService::new(
545			local_peer_id,
546			keypair.clone(),
547			cmd_tx,
548			Arc::clone(&peer_store_handle),
549			notif_protocols.clone(),
550			block_announce_protocol.clone(),
551			request_response_senders,
552			Arc::clone(&listen_addresses),
553			public_addresses,
554		));
555
556		// register rest of the metrics now that `Litep2p` has been created
557		let num_connected = Arc::new(Default::default());
558		let bandwidth: Arc<dyn BandwidthSink> =
559			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
560
561		if let Some(registry) = &params.metrics_registry {
562			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
563		}
564
565		Ok(Self {
566			network_service,
567			cmd_rx,
568			metrics,
569			peerset_handles: notif_protocols,
570			num_connected,
571			discovery,
572			pending_put_values: HashMap::new(),
573			pending_get_values: HashMap::new(),
574			peerstore_handle: peer_store_handle,
575			block_announce_protocol,
576			event_streams: out_events::OutChannels::new(None)?,
577			peers: HashMap::new(),
578			litep2p,
579		})
580	}
581
582	fn network_service(&self) -> Arc<dyn NetworkService> {
583		Arc::clone(&self.network_service)
584	}
585
586	fn peer_store(
587		bootnodes: Vec<sc_network_types::PeerId>,
588		metrics_registry: Option<Registry>,
589	) -> Self::PeerStore {
590		Peerstore::new(bootnodes, metrics_registry)
591	}
592
593	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
594		NotificationMetrics::new(registry)
595	}
596
597	/// Create Bitswap server.
598	fn bitswap_server(
599		client: Arc<dyn BlockBackend<B> + Send + Sync>,
600	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
601		BitswapServer::new(client)
602	}
603
604	/// Create notification protocol configuration for `protocol`.
605	fn notification_config(
606		protocol_name: ProtocolName,
607		fallback_names: Vec<ProtocolName>,
608		max_notification_size: u64,
609		handshake: Option<NotificationHandshake>,
610		set_config: SetConfig,
611		metrics: NotificationMetrics,
612		peerstore_handle: Arc<dyn PeerStoreProvider>,
613	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
614		Self::NotificationProtocolConfig::new(
615			protocol_name,
616			fallback_names,
617			max_notification_size as usize,
618			handshake,
619			set_config,
620			metrics,
621			peerstore_handle,
622		)
623	}
624
625	/// Create request-response protocol configuration.
626	fn request_response_config(
627		protocol_name: ProtocolName,
628		fallback_names: Vec<ProtocolName>,
629		max_request_size: u64,
630		max_response_size: u64,
631		request_timeout: Duration,
632		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
633	) -> Self::RequestResponseProtocolConfig {
634		Self::RequestResponseProtocolConfig::new(
635			protocol_name,
636			fallback_names,
637			max_request_size,
638			max_response_size,
639			request_timeout,
640			inbound_queue,
641		)
642	}
643
644	/// Start [`Litep2pNetworkBackend`] event loop.
645	async fn run(mut self) {
646		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
647
648		loop {
649			let num_connected_peers = self
650				.peerset_handles
651				.get(&self.block_announce_protocol)
652				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
653			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
654
655			tokio::select! {
656				command = self.cmd_rx.next() => match command {
657					None => return,
658					Some(command) => match command {
659						NetworkServiceCommand::GetValue{ key } => {
660							let query_id = self.discovery.get_value(key.clone()).await;
661							self.pending_get_values.insert(query_id, (key, Instant::now()));
662						}
663						NetworkServiceCommand::PutValue { key, value } => {
664							let query_id = self.discovery.put_value(key.clone(), value).await;
665							self.pending_put_values.insert(query_id, (key, Instant::now()));
666						}
667						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
668							let kademlia_key = record.key.to_vec().into();
669							let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await;
670							self.pending_put_values.insert(query_id, (kademlia_key, Instant::now()));
671						}
672
673						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
674							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
675						}
676						NetworkServiceCommand::EventStream { tx } => {
677							self.event_streams.push(tx);
678						}
679						NetworkServiceCommand::Status { tx } => {
680							let _ = tx.send(NetworkStatus {
681								num_connected_peers: self
682									.peerset_handles
683									.get(&self.block_announce_protocol)
684									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
685								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
686								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
687							});
688						}
689						NetworkServiceCommand::AddPeersToReservedSet {
690							protocol,
691							peers,
692						} => {
693							let peers = self.add_addresses(peers.into_iter().map(Into::into));
694
695							match self.peerset_handles.get(&protocol) {
696								Some(handle) => {
697									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
698								}
699								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
700							};
701						}
702						NetworkServiceCommand::AddKnownAddress { peer, address } => {
703							let mut address: Multiaddr = address.into();
704
705							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
706								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
707							}
708
709							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
710								log::debug!(
711									target: LOG_TARGET,
712									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
713								);
714							}
715						},
716						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
717							let peers = self.add_addresses(peers.into_iter().map(Into::into));
718
719							match self.peerset_handles.get(&protocol) {
720								Some(handle) => {
721									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
722								}
723								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
724							}
725
726						},
727						NetworkServiceCommand::DisconnectPeer {
728							protocol,
729							peer,
730						} => {
731							let Some(handle) = self.peerset_handles.get(&protocol) else {
732								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
733								continue
734							};
735
736							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
737						}
738						NetworkServiceCommand::SetReservedOnly {
739							protocol,
740							reserved_only,
741						} => {
742							let Some(handle) = self.peerset_handles.get(&protocol) else {
743								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
744								continue
745							};
746
747							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
748						}
749						NetworkServiceCommand::RemoveReservedPeers {
750							protocol,
751							peers,
752						} => {
753							let Some(handle) = self.peerset_handles.get(&protocol) else {
754								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
755								continue
756							};
757
758							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
759						}
760					}
761				},
762				event = self.discovery.next() => match event {
763					None => return,
764					Some(DiscoveryEvent::Discovered { addresses }) => {
765						// if at least one address was added for the peer, report the peer to `Peerstore`
766						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
767							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
768								self.peerstore_handle.add_known_peer(peer);
769							}
770						}
771					}
772					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
773						for peer in peers {
774							self.peerstore_handle.add_known_peer(peer.into());
775						}
776					}
777					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
778						if !self.pending_get_values.contains_key(&query_id) {
779							log::error!(
780								target: LOG_TARGET,
781								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
782							);
783
784							continue
785						}
786
787						let peer_id: sc_network_types::PeerId = record.peer.into();
788						let record = PeerRecord {
789							record: P2PRecord {
790								key: record.record.key.to_vec().into(),
791								value: record.record.value,
792								publisher: record.record.publisher.map(|peer_id| {
793									let peer_id: sc_network_types::PeerId = peer_id.into();
794									peer_id.into()
795								}),
796								expires: record.record.expires,
797							},
798							peer: Some(peer_id.into()),
799						};
800
801						self.event_streams.send(
802							Event::Dht(
803								DhtEvent::ValueFound(
804									record.into()
805								)
806							)
807						);
808					}
809					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
810						match self.pending_get_values.remove(&query_id) {
811							Some((key, started)) => {
812								log::trace!(
813									target: LOG_TARGET,
814									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
815								);
816
817								if let Some(ref metrics) = self.metrics {
818									metrics
819										.kademlia_query_duration
820										.with_label_values(&["value-get"])
821										.observe(started.elapsed().as_secs_f64());
822								}
823							},
824							None => {
825								log::error!(
826									target: LOG_TARGET,
827									"Missing/invalid pending query for `GET_VALUE`: {query_id:?}"
828								);
829								debug_assert!(false);
830							},
831						}
832					}
833					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
834						match self.pending_put_values.remove(&query_id) {
835							None => log::warn!(
836								target: LOG_TARGET,
837								"`PUT_VALUE` succeeded for a non-existent query",
838							),
839							Some((key, started)) => {
840								log::trace!(
841									target: LOG_TARGET,
842									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
843								);
844
845								self.event_streams.send(Event::Dht(
846									DhtEvent::ValuePut(libp2p::kad::RecordKey::new(&key))
847								));
848
849								if let Some(ref metrics) = self.metrics {
850									metrics
851										.kademlia_query_duration
852										.with_label_values(&["value-put"])
853										.observe(started.elapsed().as_secs_f64());
854								}
855							}
856						}
857					}
858					Some(DiscoveryEvent::QueryFailed { query_id }) => {
859						match self.pending_get_values.remove(&query_id) {
860							None => match self.pending_put_values.remove(&query_id) {
861								None => log::warn!(
862									target: LOG_TARGET,
863									"non-existent query failed ({query_id:?})",
864								),
865								Some((key, started)) => {
866									log::debug!(
867										target: LOG_TARGET,
868										"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
869									);
870
871									self.event_streams.send(Event::Dht(
872										DhtEvent::ValuePutFailed(libp2p::kad::RecordKey::new(&key))
873									));
874
875									if let Some(ref metrics) = self.metrics {
876										metrics
877											.kademlia_query_duration
878											.with_label_values(&["value-put-failed"])
879											.observe(started.elapsed().as_secs_f64());
880									}
881								}
882							}
883							Some((key, started)) => {
884								log::debug!(
885									target: LOG_TARGET,
886									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
887								);
888
889								self.event_streams.send(Event::Dht(
890									DhtEvent::ValueNotFound(libp2p::kad::RecordKey::new(&key))
891								));
892
893								if let Some(ref metrics) = self.metrics {
894									metrics
895										.kademlia_query_duration
896										.with_label_values(&["value-get-failed"])
897										.observe(started.elapsed().as_secs_f64());
898								}
899							}
900						}
901					}
902					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
903						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
904					}
905					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
906						match self.litep2p.public_addresses().add_address(address.clone().into()) {
907							Ok(inserted) => if inserted {
908								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
909							},
910							Err(err) => {
911								log::warn!(
912									target: LOG_TARGET,
913									"🔍 Failed to add discovered external address {address:?}: {err:?}",
914								);
915							},
916						}
917					}
918					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
919						let local_peer_id = self.litep2p.local_peer_id();
920
921						// Litep2p requires the peer ID to be present in the address.
922						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
923							address.with(Protocol::P2p(*local_peer_id.as_ref()))
924						} else {
925							address
926						};
927
928						if self.litep2p.public_addresses().remove_address(&address) {
929							log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
930						} else {
931							log::warn!(
932								target: LOG_TARGET,
933								"🔍 Failed to remove expired external address {address:?}"
934							);
935						}
936					}
937					Some(DiscoveryEvent::Ping { peer, rtt }) => {
938						log::trace!(
939							target: LOG_TARGET,
940							"ping time with {peer:?}: {rtt:?}",
941						);
942					}
943					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
944						self.event_streams.send(Event::Dht(
945							DhtEvent::PutRecordRequest(
946								libp2p::kad::RecordKey::new(&key),
947								value,
948								publisher.map(Into::into),
949								expires,
950							)
951						));
952					},
953
954					Some(DiscoveryEvent::RandomKademliaStarted) => {
955						if let Some(metrics) = self.metrics.as_ref() {
956							metrics.kademlia_random_queries_total.inc();
957						}
958					}
959				},
960				event = self.litep2p.next_event() => match event {
961					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
962						let Some(metrics) = &self.metrics else {
963							continue;
964						};
965
966						let direction = match endpoint {
967							Endpoint::Dialer { .. } => "out",
968							Endpoint::Listener { .. } => {
969								// Increment incoming connections counter.
970								//
971								// Note: For litep2p these are represented by established negotiated connections,
972								// while for libp2p (legacy) these represent not-yet-negotiated connections.
973								metrics.incoming_connections_total.inc();
974
975								"in"
976							},
977						};
978						metrics.connections_opened_total.with_label_values(&[direction]).inc();
979
980						match self.peers.entry(peer) {
981							Entry::Vacant(entry) => {
982								entry.insert(ConnectionContext {
983									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
984									num_connections: 1usize,
985								});
986								metrics.distinct_peers_connections_opened_total.inc();
987							}
988							Entry::Occupied(entry) => {
989								let entry = entry.into_mut();
990								entry.num_connections += 1;
991								entry.endpoints.insert(endpoint.connection_id(), endpoint);
992							}
993						}
994					}
995					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
996						let Some(metrics) = &self.metrics else {
997							continue;
998						};
999
1000						let Some(context) = self.peers.get_mut(&peer) else {
1001							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1002							continue
1003						};
1004
1005						let direction = match context.endpoints.remove(&connection_id) {
1006							None => {
1007								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1008								continue
1009							}
1010							Some(endpoint) => {
1011								context.num_connections -= 1;
1012
1013								match endpoint {
1014									Endpoint::Dialer { .. } => "out",
1015									Endpoint::Listener { .. } => "in",
1016								}
1017							}
1018						};
1019
1020						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1021
1022						if context.num_connections == 0 {
1023							self.peers.remove(&peer);
1024							metrics.distinct_peers_connections_closed_total.inc();
1025						}
1026					}
1027					Some(Litep2pEvent::DialFailure { address, error }) => {
1028						log::debug!(
1029							target: LOG_TARGET,
1030							"failed to dial peer at {address:?}: {error:?}",
1031						);
1032
1033						if let Some(metrics) = &self.metrics {
1034							let reason = match error {
1035								DialError::Timeout => "timeout",
1036								DialError::AddressError(_) => "invalid-address",
1037								DialError::DnsError(_) => "cannot-resolve-dns",
1038								DialError::NegotiationError(error) => match error {
1039									NegotiationError::Timeout => "timeout",
1040									NegotiationError::PeerIdMissing => "missing-peer-id",
1041									NegotiationError::StateMismatch => "state-mismatch",
1042									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1043									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1044									NegotiationError::SnowError(_) => "noise-error",
1045									NegotiationError::ParseError(_) => "parse-error",
1046									NegotiationError::IoError(_) => "io-error",
1047									NegotiationError::WebSocket(_) => "webscoket-error",
1048									NegotiationError::BadSignature => "bad-signature",
1049								}
1050							};
1051
1052							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1053						}
1054					}
1055					Some(Litep2pEvent::ListDialFailures { errors }) => {
1056						log::debug!(
1057							target: LOG_TARGET,
1058							"failed to dial peer on multiple addresses {errors:?}",
1059						);
1060
1061						if let Some(metrics) = &self.metrics {
1062							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1063						}
1064					}
1065					None => {
1066						log::error!(
1067								target: LOG_TARGET,
1068								"Litep2p backend terminated"
1069						);
1070						return
1071					}
1072				},
1073			}
1074		}
1075	}
1076}