sc_network/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Discovery mechanisms of Substrate.
20//!
21//! The `DiscoveryBehaviour` struct implements the `NetworkBehaviour` trait of libp2p and is
22//! responsible for discovering other nodes that are part of the network.
23//!
24//! Substrate uses the following mechanisms in order to discover nodes that are part of the network:
25//!
26//! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor
27//! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry.
28//!
29//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets.
30//!
31//! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests on the
32//! configured Kademlia DHTs in order for nodes to propagate to us their view of the network. This
33//! is performed automatically by the `DiscoveryBehaviour`.
34//!
35//! Additionally, the `DiscoveryBehaviour` is also capable of storing and loading value in the
36//! configured DHTs.
37//!
38//! ## Usage
39//!
40//! The `DiscoveryBehaviour` generates events of type `DiscoveryOut`, most notably
41//! `DiscoveryOut::Discovered` that is generated whenever we discover a node.
42//! Only the identity of the node is returned. The node's addresses are stored within the
43//! `DiscoveryBehaviour` and can be queried through the `NetworkBehaviour` trait.
44//!
45//! **Important**: In order for the discovery mechanism to work properly, there needs to be an
46//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
47//! of a node's address, you must call `add_self_reported_address`.
48
49use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56	core::{Endpoint, Multiaddr},
57	kad::{
58		self,
59		record::store::{MemoryStore, RecordStore},
60		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61		GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
62		RecordKey,
63	},
64	mdns::{self, tokio::Behaviour as TokioMdns},
65	multiaddr::Protocol,
66	swarm::{
67		behaviour::{
68			toggle::{Toggle, ToggleConnectionHandler},
69			DialFailure, ExternalAddrConfirmed, FromSwarm,
70		},
71		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters,
72		StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
73	},
74	PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80	cmp,
81	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82	num::NonZeroUsize,
83	task::{Context, Poll},
84	time::{Duration, Instant},
85};
86
87/// Maximum number of known external addresses that we will cache.
88/// This only affects whether we will log whenever we (re-)discover
89/// a given address.
90const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
91
92/// Default value for Kademlia replication factor which  determines to how many closest peers a
93/// record is replicated to.
94pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
95
96// The minimum number of peers we expect an answer before we terminate the request.
97const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
98
99/// `DiscoveryBehaviour` configuration.
100///
101///
102/// Note: In order to discover nodes or load and store values via Kademlia one has to add
103///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
104pub struct DiscoveryConfig {
105	local_peer_id: PeerId,
106	permanent_addresses: Vec<(PeerId, Multiaddr)>,
107	dht_random_walk: bool,
108	allow_private_ip: bool,
109	allow_non_globals_in_dht: bool,
110	discovery_only_if_under_num: u64,
111	enable_mdns: bool,
112	kademlia_disjoint_query_paths: bool,
113	kademlia_protocol: Option<StreamProtocol>,
114	kademlia_legacy_protocol: Option<StreamProtocol>,
115	kademlia_replication_factor: NonZeroUsize,
116}
117
118impl DiscoveryConfig {
119	/// Create a default configuration with the given public key.
120	pub fn new(local_peer_id: PeerId) -> Self {
121		Self {
122			local_peer_id,
123			permanent_addresses: Vec::new(),
124			dht_random_walk: true,
125			allow_private_ip: true,
126			allow_non_globals_in_dht: false,
127			discovery_only_if_under_num: std::u64::MAX,
128			enable_mdns: false,
129			kademlia_disjoint_query_paths: false,
130			kademlia_protocol: None,
131			kademlia_legacy_protocol: None,
132			kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
133				.expect("value is a constant; constant is non-zero; qed."),
134		}
135	}
136
137	/// Set the number of active connections at which we pause discovery.
138	pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
139		self.discovery_only_if_under_num = limit;
140		self
141	}
142
143	/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
144	pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
145	where
146		I: IntoIterator<Item = (PeerId, Multiaddr)>,
147	{
148		self.permanent_addresses.extend(permanent_addresses);
149		self
150	}
151
152	/// Whether the discovery behaviour should periodically perform a random
153	/// walk on the DHT to discover peers.
154	pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
155		self.dht_random_walk = value;
156		self
157	}
158
159	/// Should private IPv4/IPv6 addresses be reported?
160	pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
161		self.allow_private_ip = value;
162		self
163	}
164
165	/// Should non-global addresses be inserted to the DHT?
166	pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
167		self.allow_non_globals_in_dht = value;
168		self
169	}
170
171	/// Should MDNS discovery be supported?
172	pub fn with_mdns(&mut self, value: bool) -> &mut Self {
173		self.enable_mdns = value;
174		self
175	}
176
177	/// Add discovery via Kademlia for the given protocol.
178	///
179	/// Currently accepts `protocol_id`. This should be removed once all the nodes
180	/// are upgraded to genesis hash- and fork ID-based Kademlia protocol name.
181	pub fn with_kademlia<Hash: AsRef<[u8]>>(
182		&mut self,
183		genesis_hash: Hash,
184		fork_id: Option<&str>,
185		protocol_id: &ProtocolId,
186	) -> &mut Self {
187		self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
188		self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
189		self
190	}
191
192	/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
193	/// presence of potentially adversarial nodes.
194	pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
195		self.kademlia_disjoint_query_paths = value;
196		self
197	}
198
199	/// Sets Kademlia replication factor.
200	pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
201		self.kademlia_replication_factor = value;
202		self
203	}
204
205	/// Create a `DiscoveryBehaviour` from this config.
206	pub fn finish(self) -> DiscoveryBehaviour {
207		let Self {
208			local_peer_id,
209			permanent_addresses,
210			dht_random_walk,
211			allow_private_ip,
212			allow_non_globals_in_dht,
213			discovery_only_if_under_num,
214			enable_mdns,
215			kademlia_disjoint_query_paths,
216			kademlia_protocol,
217			kademlia_legacy_protocol,
218			kademlia_replication_factor,
219		} = self;
220
221		let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
222			let mut config = KademliaConfig::default();
223
224			config.set_replication_factor(kademlia_replication_factor);
225			// Populate kad with both the legacy and the new protocol names.
226			// Remove the legacy protocol:
227			// https://github.com/paritytech/polkadot-sdk/issues/504
228			let kademlia_protocols = if let Some(legacy_protocol) = kademlia_legacy_protocol {
229				vec![kademlia_protocol.clone(), legacy_protocol]
230			} else {
231				vec![kademlia_protocol.clone()]
232			};
233			config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());
234
235			config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
236
237			// By default Kademlia attempts to insert all peers into its routing table once a
238			// dialing attempt succeeds. In order to control which peer is added, disable the
239			// auto-insertion and instead add peers manually.
240			config.set_kbucket_inserts(BucketInserts::Manual);
241			config.disjoint_query_paths(kademlia_disjoint_query_paths);
242			let store = MemoryStore::new(local_peer_id);
243			let mut kad = Kademlia::with_config(local_peer_id, store, config);
244			kad.set_mode(Some(kad::Mode::Server));
245
246			for (peer_id, addr) in &permanent_addresses {
247				kad.add_address(peer_id, addr.clone());
248			}
249
250			Some(kad)
251		} else {
252			None
253		};
254
255		DiscoveryBehaviour {
256			permanent_addresses,
257			ephemeral_addresses: HashMap::new(),
258			kademlia: Toggle::from(kademlia),
259			next_kad_random_query: if dht_random_walk {
260				Some(Delay::new(Duration::new(0, 0)))
261			} else {
262				None
263			},
264			duration_to_next_kad: Duration::from_secs(1),
265			pending_events: VecDeque::new(),
266			local_peer_id,
267			num_connections: 0,
268			allow_private_ip,
269			discovery_only_if_under_num,
270			mdns: if enable_mdns {
271				match TokioMdns::new(mdns::Config::default(), local_peer_id) {
272					Ok(mdns) => Toggle::from(Some(mdns)),
273					Err(err) => {
274						warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
275						Toggle::from(None)
276					},
277				}
278			} else {
279				Toggle::from(None)
280			},
281			allow_non_globals_in_dht,
282			known_external_addresses: LruHashSet::new(
283				NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
284					.expect("value is a constant; constant is non-zero; qed."),
285			),
286			records_to_publish: Default::default(),
287			kademlia_protocol,
288		}
289	}
290}
291
292/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
293pub struct DiscoveryBehaviour {
294	/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
295	/// reserved nodes.
296	permanent_addresses: Vec<(PeerId, Multiaddr)>,
297	/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
298	/// removed.
299	ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
300	/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
301	/// it's always enabled in `NetworkWorker::new()`.
302	kademlia: Toggle<Kademlia<MemoryStore>>,
303	/// Discovers nodes on the local network.
304	mdns: Toggle<TokioMdns>,
305	/// Stream that fires when we need to perform the next random Kademlia query. `None` if
306	/// random walking is disabled.
307	next_kad_random_query: Option<Delay>,
308	/// After `next_kad_random_query` triggers, the next one triggers after this duration.
309	duration_to_next_kad: Duration,
310	/// Events to return in priority when polled.
311	pending_events: VecDeque<DiscoveryOut>,
312	/// Identity of our local node.
313	local_peer_id: PeerId,
314	/// Number of nodes we're currently connected to.
315	num_connections: u64,
316	/// If false, `addresses_of_peer` won't return any private IPv4/IPv6 address, except for the
317	/// ones stored in `permanent_addresses` or `ephemeral_addresses`.
318	allow_private_ip: bool,
319	/// Number of active connections over which we interrupt the discovery process.
320	discovery_only_if_under_num: u64,
321	/// Should non-global addresses be added to the DHT?
322	allow_non_globals_in_dht: bool,
323	/// A cache of discovered external addresses. Only used for logging purposes.
324	known_external_addresses: LruHashSet<Multiaddr>,
325	/// Records to publish per QueryId.
326	///
327	/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
328	/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
329	/// to these peers.
330	records_to_publish: HashMap<QueryId, Record>,
331	/// The chain based kademlia protocol name (including genesis hash and fork id).
332	///
333	/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
334	/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
335	kademlia_protocol: Option<StreamProtocol>,
336}
337
338impl DiscoveryBehaviour {
339	/// Returns the list of nodes that we know exist in the network.
340	pub fn known_peers(&mut self) -> HashSet<PeerId> {
341		let mut peers = HashSet::new();
342		if let Some(k) = self.kademlia.as_mut() {
343			for b in k.kbuckets() {
344				for e in b.iter() {
345					if !peers.contains(e.node.key.preimage()) {
346						peers.insert(*e.node.key.preimage());
347					}
348				}
349			}
350		}
351		peers
352	}
353
354	/// Adds a hard-coded address for the given peer, that never expires.
355	///
356	/// This adds an entry to the parameter that was passed to `new`.
357	///
358	/// If we didn't know this address before, also generates a `Discovered` event.
359	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
360		let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
361		if addrs_list.contains(&addr) {
362			return
363		}
364
365		if let Some(k) = self.kademlia.as_mut() {
366			k.add_address(&peer_id, addr.clone());
367		}
368
369		self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
370		addrs_list.push(addr);
371	}
372
373	/// Add a self-reported address of a remote peer to the k-buckets of the DHT
374	/// if it has compatible `supported_protocols`.
375	///
376	/// **Note**: It is important that you call this method. The discovery mechanism will not
377	/// automatically add connecting peers to the Kademlia k-buckets.
378	pub fn add_self_reported_address(
379		&mut self,
380		peer_id: &PeerId,
381		supported_protocols: &[StreamProtocol],
382		addr: Multiaddr,
383	) {
384		if let Some(kademlia) = self.kademlia.as_mut() {
385			if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
386				trace!(
387					target: "sub-libp2p",
388					"Ignoring self-reported non-global address {} from {}.", addr, peer_id
389				);
390				return
391			}
392
393			// The supported protocols must include the chain-based Kademlia protocol.
394			//
395			// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
396			// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
397			// https://github.com/paritytech/polkadot-sdk/issues/504.
398			if !supported_protocols.iter().any(|p| {
399				p == self
400					.kademlia_protocol
401					.as_ref()
402					.expect("kademlia protocol was checked above to be enabled; qed")
403			}) {
404				trace!(
405					target: "sub-libp2p",
406					"Ignoring self-reported address {} from {} as remote node is not part of the \
407					 Kademlia DHT supported by the local node.", addr, peer_id,
408				);
409				return
410			}
411
412			trace!(
413				target: "sub-libp2p",
414				"Adding self-reported address {} from {} to Kademlia DHT.",
415				addr, peer_id
416			);
417			kademlia.add_address(peer_id, addr.clone());
418		}
419	}
420
421	/// Start fetching a record from the DHT.
422	///
423	/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
424	pub fn get_value(&mut self, key: RecordKey) {
425		if let Some(k) = self.kademlia.as_mut() {
426			k.get_record(key.clone());
427		}
428	}
429
430	/// Start putting a record into the DHT. Other nodes can later fetch that value with
431	/// `get_value`.
432	///
433	/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
434	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
435		if let Some(k) = self.kademlia.as_mut() {
436			if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
437				warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e);
438				self.pending_events
439					.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
440			}
441		}
442	}
443
444	/// Puts a record into the DHT on the provided `peers`
445	///
446	/// If `update_local_storage` is true, the local storage is update as well.
447	pub fn put_record_to(
448		&mut self,
449		record: Record,
450		peers: HashSet<sc_network_types::PeerId>,
451		update_local_storage: bool,
452	) {
453		if let Some(kad) = self.kademlia.as_mut() {
454			if update_local_storage {
455				if let Err(_e) = kad.store_mut().put(record.clone()) {
456					warn!(target: "sub-libp2p", "Failed to update local starage");
457				}
458			}
459
460			if !peers.is_empty() {
461				kad.put_record_to(
462					record,
463					peers.into_iter().map(|peer_id| peer_id.into()),
464					Quorum::All,
465				);
466			}
467		}
468	}
469	/// Store a record in the Kademlia record store.
470	pub fn store_record(
471		&mut self,
472		record_key: RecordKey,
473		record_value: Vec<u8>,
474		publisher: Option<PeerId>,
475		expires: Option<Instant>,
476	) {
477		if let Some(k) = self.kademlia.as_mut() {
478			if let Err(err) = k.store_mut().put(Record {
479				key: record_key,
480				value: record_value,
481				publisher: publisher.map(|publisher| publisher.into()),
482				expires,
483			}) {
484				debug!(
485					target: "sub-libp2p",
486					"Failed to store record with key: {:?}",
487					err
488				);
489			}
490		}
491	}
492
493	/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
494	///
495	/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
496	/// of their lower bound.
497	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
498		self.kademlia.as_mut().map(|kad| {
499			kad.kbuckets()
500				.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
501				.collect()
502		})
503	}
504
505	/// Returns the number of records in the Kademlia record stores.
506	pub fn num_kademlia_records(&mut self) -> Option<usize> {
507		// Note that this code is ok only because we use a `MemoryStore`.
508		self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
509	}
510
511	/// Returns the total size in bytes of all the records in the Kademlia record stores.
512	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
513		// Note that this code is ok only because we use a `MemoryStore`. If the records were
514		// for example stored on disk, this would load every single one of them every single time.
515		self.kademlia
516			.as_mut()
517			.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
518	}
519
520	/// Can the given `Multiaddr` be put into the DHT?
521	///
522	/// This test is successful only for global IP addresses and DNS names.
523	// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
524	// because the set of valid domains is highly dynamic and would require frequent
525	// updates, for example by utilising publicsuffix.org or IANA.
526	pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
527		let ip = match addr.iter().next() {
528			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
529			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
530			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
531				return true,
532			_ => return false,
533		};
534		ip.is_global()
535	}
536}
537
538/// Event generated by the `DiscoveryBehaviour`.
539#[derive(Debug)]
540pub enum DiscoveryOut {
541	/// A connection to a peer has been established but the peer has not been
542	/// added to the routing table because [`BucketInserts::Manual`] is
543	/// configured. If the peer is to be included in the routing table, it must
544	/// be explicitly added via
545	/// [`DiscoveryBehaviour::add_self_reported_address`].
546	Discovered(PeerId),
547
548	/// A peer connected to this node for whom no listen address is known.
549	///
550	/// In order for the peer to be added to the Kademlia routing table, a known
551	/// listen address must be added via
552	/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
553	/// the `identify` protocol.
554	UnroutablePeer(PeerId),
555
556	/// The DHT yielded results for the record request.
557	///
558	/// Returning the result grouped in (key, value) pairs as well as the request duration.
559	ValueFound(PeerRecord, Duration),
560
561	/// The DHT received a put record request.
562	PutRecordRequest(
563		RecordKey,
564		Vec<u8>,
565		Option<sc_network_types::PeerId>,
566		Option<std::time::Instant>,
567	),
568
569	/// The record requested was not found in the DHT.
570	///
571	/// Returning the corresponding key as well as the request duration.
572	ValueNotFound(RecordKey, Duration),
573
574	/// The record with a given key was successfully inserted into the DHT.
575	///
576	/// Returning the corresponding key as well as the request duration.
577	ValuePut(RecordKey, Duration),
578
579	/// Inserting a value into the DHT failed.
580	///
581	/// Returning the corresponding key as well as the request duration.
582	ValuePutFailed(RecordKey, Duration),
583
584	/// Started a random Kademlia query.
585	///
586	/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
587	RandomKademliaStarted,
588}
589
590impl NetworkBehaviour for DiscoveryBehaviour {
591	type ConnectionHandler =
592		ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
593	type ToSwarm = DiscoveryOut;
594
595	fn handle_established_inbound_connection(
596		&mut self,
597		connection_id: ConnectionId,
598		peer: PeerId,
599		local_addr: &Multiaddr,
600		remote_addr: &Multiaddr,
601	) -> Result<THandler<Self>, ConnectionDenied> {
602		self.kademlia.handle_established_inbound_connection(
603			connection_id,
604			peer,
605			local_addr,
606			remote_addr,
607		)
608	}
609
610	fn handle_established_outbound_connection(
611		&mut self,
612		connection_id: ConnectionId,
613		peer: PeerId,
614		addr: &Multiaddr,
615		role_override: Endpoint,
616	) -> Result<THandler<Self>, ConnectionDenied> {
617		self.kademlia.handle_established_outbound_connection(
618			connection_id,
619			peer,
620			addr,
621			role_override,
622		)
623	}
624
625	fn handle_pending_inbound_connection(
626		&mut self,
627		connection_id: ConnectionId,
628		local_addr: &Multiaddr,
629		remote_addr: &Multiaddr,
630	) -> Result<(), ConnectionDenied> {
631		self.kademlia
632			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
633	}
634
635	fn handle_pending_outbound_connection(
636		&mut self,
637		connection_id: ConnectionId,
638		maybe_peer: Option<PeerId>,
639		addresses: &[Multiaddr],
640		effective_role: Endpoint,
641	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
642		let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
643
644		// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
645		// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
646		// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
647		// discovery DHT records).
648		let mut list: LinkedHashSet<_> = self
649			.permanent_addresses
650			.iter()
651			.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
652			.collect();
653
654		if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
655			ephemeral_addresses.iter().for_each(|address| {
656				list.insert_if_absent(address.clone());
657			});
658		}
659
660		{
661			let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
662				connection_id,
663				maybe_peer,
664				addresses,
665				effective_role,
666			)?;
667
668			list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
669				connection_id,
670				maybe_peer,
671				addresses,
672				effective_role,
673			)?);
674
675			if !self.allow_private_ip {
676				list_to_filter.retain(|addr| match addr.iter().next() {
677					Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
678					Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
679					_ => true,
680				});
681			}
682
683			list_to_filter.into_iter().for_each(|address| {
684				list.insert_if_absent(address);
685			});
686		}
687
688		trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list);
689
690		Ok(list.into_iter().collect())
691	}
692
693	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
694		match event {
695			FromSwarm::ConnectionEstablished(e) => {
696				self.num_connections += 1;
697				self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
698			},
699			FromSwarm::ConnectionClosed(e) => {
700				self.num_connections -= 1;
701				self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
702			},
703			FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
704				if let Some(peer_id) = peer_id {
705					if let DialError::Transport(errors) = error {
706						if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
707						{
708							for (addr, _error) in errors {
709								entry.get_mut().retain(|a| a != addr);
710							}
711							if entry.get().is_empty() {
712								entry.remove();
713							}
714						}
715					}
716				}
717
718				self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
719			},
720			FromSwarm::ListenerClosed(e) => {
721				self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
722			},
723			FromSwarm::ListenFailure(e) => {
724				self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
725			},
726			FromSwarm::ListenerError(e) => {
727				self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
728			},
729			FromSwarm::ExternalAddrExpired(e) => {
730				// We intentionally don't remove the element from `known_external_addresses` in
731				// order to not print the log line again.
732
733				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
734			},
735			FromSwarm::NewListener(e) => {
736				self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
737			},
738			FromSwarm::ExpiredListenAddr(e) => {
739				self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
740			},
741			FromSwarm::NewExternalAddrCandidate(e) => {
742				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
743			},
744			FromSwarm::AddressChange(e) => {
745				self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
746			},
747			FromSwarm::NewListenAddr(e) => {
748				self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
749				self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
750			},
751			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
752				let mut address = addr.clone();
753
754				if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
755					if peer_id != self.local_peer_id {
756						warn!(
757							target: "sub-libp2p",
758							"🔍 Discovered external address for a peer that is not us: {addr}",
759						);
760						// Ensure this address is not propagated to kademlia.
761						return
762					}
763				} else {
764					address.push(Protocol::P2p(self.local_peer_id));
765				}
766
767				if Self::can_add_to_dht(&address) {
768					// NOTE: we might re-discover the same address multiple times
769					// in which case we just want to refrain from logging.
770					if self.known_external_addresses.insert(address.clone()) {
771						info!(
772						  target: "sub-libp2p",
773						  "🔍 Discovered new external address for our node: {address}",
774						);
775					}
776				}
777
778				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
779			},
780		}
781	}
782
783	fn on_connection_handler_event(
784		&mut self,
785		peer_id: PeerId,
786		connection_id: ConnectionId,
787		event: THandlerOutEvent<Self>,
788	) {
789		self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
790	}
791
792	fn poll(
793		&mut self,
794		cx: &mut Context,
795		params: &mut impl PollParameters,
796	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
797		// Immediately process the content of `discovered`.
798		if let Some(ev) = self.pending_events.pop_front() {
799			return Poll::Ready(ToSwarm::GenerateEvent(ev))
800		}
801
802		// Poll the stream that fires when we need to start a random Kademlia query.
803		if let Some(kademlia) = self.kademlia.as_mut() {
804			if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
805				while next_kad_random_query.poll_unpin(cx).is_ready() {
806					let actually_started =
807						if self.num_connections < self.discovery_only_if_under_num {
808							let random_peer_id = PeerId::random();
809							debug!(
810								target: "sub-libp2p",
811								"Libp2p <= Starting random Kademlia request for {:?}",
812								random_peer_id,
813							);
814							kademlia.get_closest_peers(random_peer_id);
815							true
816						} else {
817							debug!(
818								target: "sub-libp2p",
819								"Kademlia paused due to high number of connections ({})",
820								self.num_connections
821							);
822							false
823						};
824
825					// Schedule the next random query with exponentially increasing delay,
826					// capped at 60 seconds.
827					*next_kad_random_query = Delay::new(self.duration_to_next_kad);
828					self.duration_to_next_kad =
829						cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
830
831					if actually_started {
832						let ev = DiscoveryOut::RandomKademliaStarted;
833						return Poll::Ready(ToSwarm::GenerateEvent(ev))
834					}
835				}
836			}
837		}
838
839		while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
840			match ev {
841				ToSwarm::GenerateEvent(ev) => match ev {
842					KademliaEvent::RoutingUpdated { peer, .. } => {
843						let ev = DiscoveryOut::Discovered(peer);
844						return Poll::Ready(ToSwarm::GenerateEvent(ev))
845					},
846					KademliaEvent::UnroutablePeer { peer, .. } => {
847						let ev = DiscoveryOut::UnroutablePeer(peer);
848						return Poll::Ready(ToSwarm::GenerateEvent(ev))
849					},
850					KademliaEvent::RoutablePeer { peer, .. } => {
851						let ev = DiscoveryOut::Discovered(peer);
852						return Poll::Ready(ToSwarm::GenerateEvent(ev))
853					},
854					KademliaEvent::PendingRoutablePeer { .. } => {
855						// We are not interested in this event at the moment.
856					},
857					KademliaEvent::InboundRequest { request } => match request {
858						libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
859							return Poll::Ready(ToSwarm::GenerateEvent(
860								DiscoveryOut::PutRecordRequest(
861									record.key,
862									record.value,
863									record.publisher.map(Into::into),
864									record.expires,
865								),
866							)),
867						_ => {},
868					},
869					KademliaEvent::OutboundQueryProgressed {
870						result: QueryResult::GetClosestPeers(res),
871						..
872					} => match res {
873						Err(GetClosestPeersError::Timeout { key, peers }) => {
874							debug!(
875								target: "sub-libp2p",
876								"Libp2p => Query for {:?} timed out with {} results",
877								HexDisplay::from(&key), peers.len(),
878							);
879						},
880						Ok(ok) => {
881							trace!(
882								target: "sub-libp2p",
883								"Libp2p => Query for {:?} yielded {:?} results",
884								HexDisplay::from(&ok.key), ok.peers.len(),
885							);
886							if ok.peers.is_empty() && self.num_connections != 0 {
887								debug!(
888									target: "sub-libp2p",
889									"Libp2p => Random Kademlia query has yielded empty results",
890								);
891							}
892						},
893					},
894					KademliaEvent::OutboundQueryProgressed {
895						result: QueryResult::GetRecord(res),
896						stats,
897						id,
898						..
899					} => {
900						let ev = match res {
901							Ok(GetRecordOk::FoundRecord(r)) => {
902								debug!(
903									target: "sub-libp2p",
904									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
905									r.record.key,
906									r.record.value,
907									id,
908									stats,
909								);
910
911								// Let's directly finish the query if we are above 4.
912								// This number is small enough to make sure we don't
913								// unnecessarily flood the network with queries, but high
914								// enough to make sure we also touch peers which might have
915								// old record, so that we can update them once we notice
916								// they have old records.
917								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
918									if let Some(kad) = self.kademlia.as_mut() {
919										if let Some(mut query) = kad.query_mut(&id) {
920											query.finish();
921										}
922									}
923								}
924
925								// Will be removed below when we receive
926								// `FinishedWithNoAdditionalRecord`.
927								self.records_to_publish.insert(id, r.record.clone());
928
929								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
930							},
931							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
932								cache_candidates,
933							}) => {
934								debug!(
935									target: "sub-libp2p",
936									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
937									id,
938									stats,
939									stats.duration().map(|val| val.as_millis())
940								);
941								// We always need to remove the record to not leak any data!
942								if let Some(record) = self.records_to_publish.remove(&id) {
943									if cache_candidates.is_empty() {
944										continue
945									}
946
947									// Put the record to the `cache_candidates` that are nearest to
948									// the record key from our point of view of the network.
949									if let Some(kad) = self.kademlia.as_mut() {
950										kad.put_record_to(
951											record,
952											cache_candidates.into_iter().map(|v| v.1),
953											Quorum::One,
954										);
955									}
956								}
957
958								continue
959							},
960							Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
961								trace!(
962									target: "sub-libp2p",
963									"Libp2p => Failed to get record: {:?}",
964									e,
965								);
966								DiscoveryOut::ValueNotFound(
967									e.into_key(),
968									stats.duration().unwrap_or_default(),
969								)
970							},
971							Err(e) => {
972								debug!(
973									target: "sub-libp2p",
974									"Libp2p => Failed to get record: {:?}",
975									e,
976								);
977								DiscoveryOut::ValueNotFound(
978									e.into_key(),
979									stats.duration().unwrap_or_default(),
980								)
981							},
982						};
983						return Poll::Ready(ToSwarm::GenerateEvent(ev))
984					},
985					KademliaEvent::OutboundQueryProgressed {
986						result: QueryResult::PutRecord(res),
987						stats,
988						..
989					} => {
990						let ev = match res {
991							Ok(ok) =>
992								DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()),
993							Err(e) => {
994								debug!(
995									target: "sub-libp2p",
996									"Libp2p => Failed to put record: {:?}",
997									e,
998								);
999								DiscoveryOut::ValuePutFailed(
1000									e.into_key(),
1001									stats.duration().unwrap_or_default(),
1002								)
1003							},
1004						};
1005						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1006					},
1007					KademliaEvent::OutboundQueryProgressed {
1008						result: QueryResult::RepublishRecord(res),
1009						..
1010					} => match res {
1011						Ok(ok) => debug!(
1012							target: "sub-libp2p",
1013							"Libp2p => Record republished: {:?}",
1014							ok.key,
1015						),
1016						Err(e) => debug!(
1017							target: "sub-libp2p",
1018							"Libp2p => Republishing of record {:?} failed with: {:?}",
1019							e.key(), e,
1020						),
1021					},
1022					// We never start any other type of query.
1023					KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1024						warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
1025					},
1026				},
1027				ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1028				ToSwarm::NotifyHandler { peer_id, handler, event } =>
1029					return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
1030				ToSwarm::CloseConnection { peer_id, connection } =>
1031					return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
1032				ToSwarm::NewExternalAddrCandidate(observed) =>
1033					return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
1034				ToSwarm::ExternalAddrConfirmed(addr) =>
1035					return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
1036				ToSwarm::ExternalAddrExpired(addr) =>
1037					return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
1038				ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
1039				ToSwarm::RemoveListener { id } =>
1040					return Poll::Ready(ToSwarm::RemoveListener { id }),
1041			}
1042		}
1043
1044		// Poll mDNS.
1045		while let Poll::Ready(ev) = self.mdns.poll(cx, params) {
1046			match ev {
1047				ToSwarm::GenerateEvent(event) => match event {
1048					mdns::Event::Discovered(list) => {
1049						if self.num_connections >= self.discovery_only_if_under_num {
1050							continue
1051						}
1052
1053						self.pending_events.extend(
1054							list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1055						);
1056						if let Some(ev) = self.pending_events.pop_front() {
1057							return Poll::Ready(ToSwarm::GenerateEvent(ev))
1058						}
1059					},
1060					mdns::Event::Expired(_) => {},
1061				},
1062				ToSwarm::Dial { .. } => {
1063					unreachable!("mDNS never dials!");
1064				},
1065				// `event` is an enum with no variant
1066				ToSwarm::NotifyHandler { event, .. } => match event {},
1067				ToSwarm::CloseConnection { peer_id, connection } =>
1068					return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
1069				ToSwarm::NewExternalAddrCandidate(observed) =>
1070					return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
1071				ToSwarm::ExternalAddrConfirmed(addr) =>
1072					return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
1073				ToSwarm::ExternalAddrExpired(addr) =>
1074					return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
1075				ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
1076				ToSwarm::RemoveListener { id } =>
1077					return Poll::Ready(ToSwarm::RemoveListener { id }),
1078			}
1079		}
1080
1081		Poll::Pending
1082	}
1083}
1084
1085/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
1086fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1087	let name = format!("/{}/kad", id.as_ref());
1088	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1089}
1090
1091/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
1092fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1093	genesis_hash: Hash,
1094	fork_id: Option<&str>,
1095) -> StreamProtocol {
1096	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1097	let name = if let Some(fork_id) = fork_id {
1098		format!("/{genesis_hash_hex}/{fork_id}/kad")
1099	} else {
1100		format!("/{genesis_hash_hex}/kad")
1101	};
1102
1103	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108	use super::{
1109		kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig, DiscoveryOut,
1110	};
1111	use crate::config::ProtocolId;
1112	use futures::prelude::*;
1113	use libp2p::{
1114		core::{
1115			transport::{MemoryTransport, Transport},
1116			upgrade,
1117		},
1118		identity::Keypair,
1119		noise,
1120		swarm::{Executor, Swarm, SwarmEvent},
1121		yamux, Multiaddr,
1122	};
1123	use sp_core::hash::H256;
1124	use std::{collections::HashSet, pin::Pin, task::Poll};
1125
1126	struct TokioExecutor(tokio::runtime::Runtime);
1127	impl Executor for TokioExecutor {
1128		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1129			let _ = self.0.spawn(f);
1130		}
1131	}
1132
1133	#[test]
1134	fn discovery_working() {
1135		let mut first_swarm_peer_id_and_addr = None;
1136
1137		let genesis_hash = H256::from_low_u64_be(1);
1138		let fork_id = Some("test-fork-id");
1139		let protocol_id = ProtocolId::from("dot");
1140
1141		// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
1142		// the first swarm via `with_permanent_addresses`.
1143		let mut swarms = (0..25)
1144			.map(|i| {
1145				let keypair = Keypair::generate_ed25519();
1146
1147				let transport = MemoryTransport::new()
1148					.upgrade(upgrade::Version::V1)
1149					.authenticate(noise::Config::new(&keypair).unwrap())
1150					.multiplex(yamux::Config::default())
1151					.boxed();
1152
1153				let behaviour = {
1154					let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1155					config
1156						.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1157						.allow_private_ip(true)
1158						.allow_non_globals_in_dht(true)
1159						.discovery_limit(50)
1160						.with_kademlia(genesis_hash, fork_id, &protocol_id);
1161
1162					config.finish()
1163				};
1164
1165				let runtime = tokio::runtime::Runtime::new().unwrap();
1166				#[allow(deprecated)]
1167				let mut swarm = libp2p::swarm::SwarmBuilder::with_executor(
1168					transport,
1169					behaviour,
1170					keypair.public().to_peer_id(),
1171					TokioExecutor(runtime),
1172				)
1173				.build();
1174
1175				let listen_addr: Multiaddr =
1176					format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1177
1178				if i == 0 {
1179					first_swarm_peer_id_and_addr =
1180						Some((keypair.public().to_peer_id(), listen_addr.clone()))
1181				}
1182
1183				swarm.listen_on(listen_addr.clone()).unwrap();
1184				(swarm, listen_addr)
1185			})
1186			.collect::<Vec<_>>();
1187
1188		// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
1189		let mut to_discover = (0..swarms.len())
1190			.map(|n| {
1191				(0..swarms.len())
1192					// Skip the first swarm as all other swarms already know it.
1193					.skip(1)
1194					.filter(|p| *p != n)
1195					.map(|p| *Swarm::local_peer_id(&swarms[p].0))
1196					.collect::<HashSet<_>>()
1197			})
1198			.collect::<Vec<_>>();
1199
1200		let fut = futures::future::poll_fn(move |cx| {
1201			'polling: loop {
1202				for swarm_n in 0..swarms.len() {
1203					match swarms[swarm_n].0.poll_next_unpin(cx) {
1204						Poll::Ready(Some(e)) => {
1205							match e {
1206								SwarmEvent::Behaviour(behavior) => {
1207									match behavior {
1208										DiscoveryOut::UnroutablePeer(other) |
1209										DiscoveryOut::Discovered(other) => {
1210											// Call `add_self_reported_address` to simulate identify
1211											// happening.
1212											let addr = swarms
1213												.iter()
1214												.find_map(|(s, a)| {
1215													if s.behaviour().local_peer_id == other {
1216														Some(a.clone())
1217													} else {
1218														None
1219													}
1220												})
1221												.unwrap();
1222											// Test both genesis hash-based and legacy
1223											// protocol names.
1224											let protocol_names = if swarm_n % 2 == 0 {
1225												vec![kademlia_protocol_name(genesis_hash, fork_id)]
1226											} else {
1227												vec![
1228													legacy_kademlia_protocol_name(&protocol_id),
1229													kademlia_protocol_name(genesis_hash, fork_id),
1230												]
1231											};
1232											swarms[swarm_n]
1233												.0
1234												.behaviour_mut()
1235												.add_self_reported_address(
1236													&other,
1237													protocol_names.as_slice(),
1238													addr,
1239												);
1240
1241											to_discover[swarm_n].remove(&other);
1242										},
1243										DiscoveryOut::RandomKademliaStarted => {},
1244										e => {
1245											panic!("Unexpected event: {:?}", e)
1246										},
1247									}
1248								},
1249								// ignore non Behaviour events
1250								_ => {},
1251							}
1252							continue 'polling
1253						},
1254						_ => {},
1255					}
1256				}
1257				break
1258			}
1259
1260			if to_discover.iter().all(|l| l.is_empty()) {
1261				Poll::Ready(())
1262			} else {
1263				Poll::Pending
1264			}
1265		});
1266
1267		futures::executor::block_on(fut);
1268	}
1269
1270	#[test]
1271	fn discovery_ignores_peers_with_unknown_protocols() {
1272		let supported_genesis_hash = H256::from_low_u64_be(1);
1273		let unsupported_genesis_hash = H256::from_low_u64_be(2);
1274		let supported_protocol_id = ProtocolId::from("a");
1275		let unsupported_protocol_id = ProtocolId::from("b");
1276
1277		let mut discovery = {
1278			let keypair = Keypair::generate_ed25519();
1279			let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1280			config
1281				.allow_private_ip(true)
1282				.allow_non_globals_in_dht(true)
1283				.discovery_limit(50)
1284				.with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1285			config.finish()
1286		};
1287
1288		let predictable_peer_id = |bytes: &[u8; 32]| {
1289			Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1290		};
1291
1292		let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1293		let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1294		let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1295		let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1296
1297		// Try adding remote peers with unsupported protocols.
1298		discovery.add_self_reported_address(
1299			&remote_peer_id,
1300			&[kademlia_protocol_name(unsupported_genesis_hash, None)],
1301			remote_addr.clone(),
1302		);
1303		discovery.add_self_reported_address(
1304			&another_peer_id,
1305			&[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1306			another_addr.clone(),
1307		);
1308
1309		{
1310			let kademlia = discovery.kademlia.as_mut().unwrap();
1311			assert!(
1312				kademlia
1313					.kbucket(remote_peer_id)
1314					.expect("Remote peer id not to be equal to local peer id.")
1315					.is_empty(),
1316				"Expect peer with unsupported protocol not to be added."
1317			);
1318			assert!(
1319				kademlia
1320					.kbucket(another_peer_id)
1321					.expect("Remote peer id not to be equal to local peer id.")
1322					.is_empty(),
1323				"Expect peer with unsupported protocol not to be added."
1324			);
1325		}
1326
1327		// Add remote peers with supported protocols.
1328		discovery.add_self_reported_address(
1329			&remote_peer_id,
1330			&[kademlia_protocol_name(supported_genesis_hash, None)],
1331			remote_addr.clone(),
1332		);
1333		{
1334			let kademlia = discovery.kademlia.as_mut().unwrap();
1335			assert!(
1336				!kademlia
1337					.kbucket(remote_peer_id)
1338					.expect("Remote peer id not to be equal to local peer id.")
1339					.is_empty(),
1340				"Expect peer with supported protocol to be added."
1341			);
1342		}
1343
1344		let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1345		let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1346
1347		// Check the unsupported peer is not present before and after the call.
1348		{
1349			let kademlia = discovery.kademlia.as_mut().unwrap();
1350			assert!(
1351				kademlia
1352					.kbucket(unsupported_peer_id)
1353					.expect("Remote peer id not to be equal to local peer id.")
1354					.is_empty(),
1355				"Expect unsupported peer not to be added."
1356			);
1357		}
1358		// Note: legacy protocol is not supported without genesis hash and fork ID,
1359		// if the legacy is the only protocol supported, then the peer will not be added.
1360		discovery.add_self_reported_address(
1361			&unsupported_peer_id,
1362			&[legacy_kademlia_protocol_name(&supported_protocol_id)],
1363			unsupported_peer_addr.clone(),
1364		);
1365		{
1366			let kademlia = discovery.kademlia.as_mut().unwrap();
1367			assert!(
1368				kademlia
1369					.kbucket(unsupported_peer_id)
1370					.expect("Remote peer id not to be equal to local peer id.")
1371					.is_empty(),
1372				"Expect unsupported peer not to be added."
1373			);
1374		}
1375
1376		// Supported legacy and genesis based protocols are allowed to be added.
1377		discovery.add_self_reported_address(
1378			&another_peer_id,
1379			&[
1380				legacy_kademlia_protocol_name(&supported_protocol_id),
1381				kademlia_protocol_name(supported_genesis_hash, None),
1382			],
1383			another_addr.clone(),
1384		);
1385
1386		{
1387			let kademlia = discovery.kademlia.as_mut().unwrap();
1388			assert_eq!(
1389				2,
1390				kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1391				"Expect peers with supported protocol to be added."
1392			);
1393			assert!(
1394				!kademlia
1395					.kbucket(another_peer_id)
1396					.expect("Remote peer id not to be equal to local peer id.")
1397					.is_empty(),
1398				"Expect peer with supported protocol to be added."
1399			);
1400		}
1401	}
1402}