1use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56 core::{Endpoint, Multiaddr},
57 kad::{
58 self,
59 record::store::{MemoryStore, RecordStore},
60 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61 GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
62 RecordKey,
63 },
64 mdns::{self, tokio::Behaviour as TokioMdns},
65 multiaddr::Protocol,
66 swarm::{
67 behaviour::{
68 toggle::{Toggle, ToggleConnectionHandler},
69 DialFailure, ExternalAddrConfirmed, FromSwarm,
70 },
71 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters,
72 StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
73 },
74 PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80 cmp,
81 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82 num::NonZeroUsize,
83 task::{Context, Poll},
84 time::{Duration, Instant},
85};
86
87const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
91
92pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
95
96const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
98
99pub struct DiscoveryConfig {
105 local_peer_id: PeerId,
106 permanent_addresses: Vec<(PeerId, Multiaddr)>,
107 dht_random_walk: bool,
108 allow_private_ip: bool,
109 allow_non_globals_in_dht: bool,
110 discovery_only_if_under_num: u64,
111 enable_mdns: bool,
112 kademlia_disjoint_query_paths: bool,
113 kademlia_protocol: Option<StreamProtocol>,
114 kademlia_legacy_protocol: Option<StreamProtocol>,
115 kademlia_replication_factor: NonZeroUsize,
116}
117
118impl DiscoveryConfig {
119 pub fn new(local_peer_id: PeerId) -> Self {
121 Self {
122 local_peer_id,
123 permanent_addresses: Vec::new(),
124 dht_random_walk: true,
125 allow_private_ip: true,
126 allow_non_globals_in_dht: false,
127 discovery_only_if_under_num: std::u64::MAX,
128 enable_mdns: false,
129 kademlia_disjoint_query_paths: false,
130 kademlia_protocol: None,
131 kademlia_legacy_protocol: None,
132 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
133 .expect("value is a constant; constant is non-zero; qed."),
134 }
135 }
136
137 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
139 self.discovery_only_if_under_num = limit;
140 self
141 }
142
143 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
145 where
146 I: IntoIterator<Item = (PeerId, Multiaddr)>,
147 {
148 self.permanent_addresses.extend(permanent_addresses);
149 self
150 }
151
152 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
155 self.dht_random_walk = value;
156 self
157 }
158
159 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
161 self.allow_private_ip = value;
162 self
163 }
164
165 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
167 self.allow_non_globals_in_dht = value;
168 self
169 }
170
171 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
173 self.enable_mdns = value;
174 self
175 }
176
177 pub fn with_kademlia<Hash: AsRef<[u8]>>(
182 &mut self,
183 genesis_hash: Hash,
184 fork_id: Option<&str>,
185 protocol_id: &ProtocolId,
186 ) -> &mut Self {
187 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
188 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
189 self
190 }
191
192 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
195 self.kademlia_disjoint_query_paths = value;
196 self
197 }
198
199 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
201 self.kademlia_replication_factor = value;
202 self
203 }
204
205 pub fn finish(self) -> DiscoveryBehaviour {
207 let Self {
208 local_peer_id,
209 permanent_addresses,
210 dht_random_walk,
211 allow_private_ip,
212 allow_non_globals_in_dht,
213 discovery_only_if_under_num,
214 enable_mdns,
215 kademlia_disjoint_query_paths,
216 kademlia_protocol,
217 kademlia_legacy_protocol,
218 kademlia_replication_factor,
219 } = self;
220
221 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
222 let mut config = KademliaConfig::default();
223
224 config.set_replication_factor(kademlia_replication_factor);
225 let kademlia_protocols = if let Some(legacy_protocol) = kademlia_legacy_protocol {
229 vec![kademlia_protocol.clone(), legacy_protocol]
230 } else {
231 vec![kademlia_protocol.clone()]
232 };
233 config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());
234
235 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
236
237 config.set_kbucket_inserts(BucketInserts::Manual);
241 config.disjoint_query_paths(kademlia_disjoint_query_paths);
242 let store = MemoryStore::new(local_peer_id);
243 let mut kad = Kademlia::with_config(local_peer_id, store, config);
244 kad.set_mode(Some(kad::Mode::Server));
245
246 for (peer_id, addr) in &permanent_addresses {
247 kad.add_address(peer_id, addr.clone());
248 }
249
250 Some(kad)
251 } else {
252 None
253 };
254
255 DiscoveryBehaviour {
256 permanent_addresses,
257 ephemeral_addresses: HashMap::new(),
258 kademlia: Toggle::from(kademlia),
259 next_kad_random_query: if dht_random_walk {
260 Some(Delay::new(Duration::new(0, 0)))
261 } else {
262 None
263 },
264 duration_to_next_kad: Duration::from_secs(1),
265 pending_events: VecDeque::new(),
266 local_peer_id,
267 num_connections: 0,
268 allow_private_ip,
269 discovery_only_if_under_num,
270 mdns: if enable_mdns {
271 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
272 Ok(mdns) => Toggle::from(Some(mdns)),
273 Err(err) => {
274 warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
275 Toggle::from(None)
276 },
277 }
278 } else {
279 Toggle::from(None)
280 },
281 allow_non_globals_in_dht,
282 known_external_addresses: LruHashSet::new(
283 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
284 .expect("value is a constant; constant is non-zero; qed."),
285 ),
286 records_to_publish: Default::default(),
287 kademlia_protocol,
288 }
289 }
290}
291
292pub struct DiscoveryBehaviour {
294 permanent_addresses: Vec<(PeerId, Multiaddr)>,
297 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
300 kademlia: Toggle<Kademlia<MemoryStore>>,
303 mdns: Toggle<TokioMdns>,
305 next_kad_random_query: Option<Delay>,
308 duration_to_next_kad: Duration,
310 pending_events: VecDeque<DiscoveryOut>,
312 local_peer_id: PeerId,
314 num_connections: u64,
316 allow_private_ip: bool,
319 discovery_only_if_under_num: u64,
321 allow_non_globals_in_dht: bool,
323 known_external_addresses: LruHashSet<Multiaddr>,
325 records_to_publish: HashMap<QueryId, Record>,
331 kademlia_protocol: Option<StreamProtocol>,
336}
337
338impl DiscoveryBehaviour {
339 pub fn known_peers(&mut self) -> HashSet<PeerId> {
341 let mut peers = HashSet::new();
342 if let Some(k) = self.kademlia.as_mut() {
343 for b in k.kbuckets() {
344 for e in b.iter() {
345 if !peers.contains(e.node.key.preimage()) {
346 peers.insert(*e.node.key.preimage());
347 }
348 }
349 }
350 }
351 peers
352 }
353
354 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
360 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
361 if addrs_list.contains(&addr) {
362 return
363 }
364
365 if let Some(k) = self.kademlia.as_mut() {
366 k.add_address(&peer_id, addr.clone());
367 }
368
369 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
370 addrs_list.push(addr);
371 }
372
373 pub fn add_self_reported_address(
379 &mut self,
380 peer_id: &PeerId,
381 supported_protocols: &[StreamProtocol],
382 addr: Multiaddr,
383 ) {
384 if let Some(kademlia) = self.kademlia.as_mut() {
385 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
386 trace!(
387 target: "sub-libp2p",
388 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
389 );
390 return
391 }
392
393 if !supported_protocols.iter().any(|p| {
399 p == self
400 .kademlia_protocol
401 .as_ref()
402 .expect("kademlia protocol was checked above to be enabled; qed")
403 }) {
404 trace!(
405 target: "sub-libp2p",
406 "Ignoring self-reported address {} from {} as remote node is not part of the \
407 Kademlia DHT supported by the local node.", addr, peer_id,
408 );
409 return
410 }
411
412 trace!(
413 target: "sub-libp2p",
414 "Adding self-reported address {} from {} to Kademlia DHT.",
415 addr, peer_id
416 );
417 kademlia.add_address(peer_id, addr.clone());
418 }
419 }
420
421 pub fn get_value(&mut self, key: RecordKey) {
425 if let Some(k) = self.kademlia.as_mut() {
426 k.get_record(key.clone());
427 }
428 }
429
430 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
435 if let Some(k) = self.kademlia.as_mut() {
436 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
437 warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e);
438 self.pending_events
439 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
440 }
441 }
442 }
443
444 pub fn put_record_to(
448 &mut self,
449 record: Record,
450 peers: HashSet<sc_network_types::PeerId>,
451 update_local_storage: bool,
452 ) {
453 if let Some(kad) = self.kademlia.as_mut() {
454 if update_local_storage {
455 if let Err(_e) = kad.store_mut().put(record.clone()) {
456 warn!(target: "sub-libp2p", "Failed to update local starage");
457 }
458 }
459
460 if !peers.is_empty() {
461 kad.put_record_to(
462 record,
463 peers.into_iter().map(|peer_id| peer_id.into()),
464 Quorum::All,
465 );
466 }
467 }
468 }
469 pub fn store_record(
471 &mut self,
472 record_key: RecordKey,
473 record_value: Vec<u8>,
474 publisher: Option<PeerId>,
475 expires: Option<Instant>,
476 ) {
477 if let Some(k) = self.kademlia.as_mut() {
478 if let Err(err) = k.store_mut().put(Record {
479 key: record_key,
480 value: record_value,
481 publisher: publisher.map(|publisher| publisher.into()),
482 expires,
483 }) {
484 debug!(
485 target: "sub-libp2p",
486 "Failed to store record with key: {:?}",
487 err
488 );
489 }
490 }
491 }
492
493 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
498 self.kademlia.as_mut().map(|kad| {
499 kad.kbuckets()
500 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
501 .collect()
502 })
503 }
504
505 pub fn num_kademlia_records(&mut self) -> Option<usize> {
507 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
509 }
510
511 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
513 self.kademlia
516 .as_mut()
517 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
518 }
519
520 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
527 let ip = match addr.iter().next() {
528 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
529 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
530 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
531 return true,
532 _ => return false,
533 };
534 ip.is_global()
535 }
536}
537
538#[derive(Debug)]
540pub enum DiscoveryOut {
541 Discovered(PeerId),
547
548 UnroutablePeer(PeerId),
555
556 ValueFound(PeerRecord, Duration),
560
561 PutRecordRequest(
563 RecordKey,
564 Vec<u8>,
565 Option<sc_network_types::PeerId>,
566 Option<std::time::Instant>,
567 ),
568
569 ValueNotFound(RecordKey, Duration),
573
574 ValuePut(RecordKey, Duration),
578
579 ValuePutFailed(RecordKey, Duration),
583
584 RandomKademliaStarted,
588}
589
590impl NetworkBehaviour for DiscoveryBehaviour {
591 type ConnectionHandler =
592 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
593 type ToSwarm = DiscoveryOut;
594
595 fn handle_established_inbound_connection(
596 &mut self,
597 connection_id: ConnectionId,
598 peer: PeerId,
599 local_addr: &Multiaddr,
600 remote_addr: &Multiaddr,
601 ) -> Result<THandler<Self>, ConnectionDenied> {
602 self.kademlia.handle_established_inbound_connection(
603 connection_id,
604 peer,
605 local_addr,
606 remote_addr,
607 )
608 }
609
610 fn handle_established_outbound_connection(
611 &mut self,
612 connection_id: ConnectionId,
613 peer: PeerId,
614 addr: &Multiaddr,
615 role_override: Endpoint,
616 ) -> Result<THandler<Self>, ConnectionDenied> {
617 self.kademlia.handle_established_outbound_connection(
618 connection_id,
619 peer,
620 addr,
621 role_override,
622 )
623 }
624
625 fn handle_pending_inbound_connection(
626 &mut self,
627 connection_id: ConnectionId,
628 local_addr: &Multiaddr,
629 remote_addr: &Multiaddr,
630 ) -> Result<(), ConnectionDenied> {
631 self.kademlia
632 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
633 }
634
635 fn handle_pending_outbound_connection(
636 &mut self,
637 connection_id: ConnectionId,
638 maybe_peer: Option<PeerId>,
639 addresses: &[Multiaddr],
640 effective_role: Endpoint,
641 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
642 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
643
644 let mut list: LinkedHashSet<_> = self
649 .permanent_addresses
650 .iter()
651 .filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
652 .collect();
653
654 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
655 ephemeral_addresses.iter().for_each(|address| {
656 list.insert_if_absent(address.clone());
657 });
658 }
659
660 {
661 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
662 connection_id,
663 maybe_peer,
664 addresses,
665 effective_role,
666 )?;
667
668 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
669 connection_id,
670 maybe_peer,
671 addresses,
672 effective_role,
673 )?);
674
675 if !self.allow_private_ip {
676 list_to_filter.retain(|addr| match addr.iter().next() {
677 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
678 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
679 _ => true,
680 });
681 }
682
683 list_to_filter.into_iter().for_each(|address| {
684 list.insert_if_absent(address);
685 });
686 }
687
688 trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list);
689
690 Ok(list.into_iter().collect())
691 }
692
693 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
694 match event {
695 FromSwarm::ConnectionEstablished(e) => {
696 self.num_connections += 1;
697 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
698 },
699 FromSwarm::ConnectionClosed(e) => {
700 self.num_connections -= 1;
701 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
702 },
703 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
704 if let Some(peer_id) = peer_id {
705 if let DialError::Transport(errors) = error {
706 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
707 {
708 for (addr, _error) in errors {
709 entry.get_mut().retain(|a| a != addr);
710 }
711 if entry.get().is_empty() {
712 entry.remove();
713 }
714 }
715 }
716 }
717
718 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
719 },
720 FromSwarm::ListenerClosed(e) => {
721 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
722 },
723 FromSwarm::ListenFailure(e) => {
724 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
725 },
726 FromSwarm::ListenerError(e) => {
727 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
728 },
729 FromSwarm::ExternalAddrExpired(e) => {
730 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
734 },
735 FromSwarm::NewListener(e) => {
736 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
737 },
738 FromSwarm::ExpiredListenAddr(e) => {
739 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
740 },
741 FromSwarm::NewExternalAddrCandidate(e) => {
742 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
743 },
744 FromSwarm::AddressChange(e) => {
745 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
746 },
747 FromSwarm::NewListenAddr(e) => {
748 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
749 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
750 },
751 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
752 let mut address = addr.clone();
753
754 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
755 if peer_id != self.local_peer_id {
756 warn!(
757 target: "sub-libp2p",
758 "🔍 Discovered external address for a peer that is not us: {addr}",
759 );
760 return
762 }
763 } else {
764 address.push(Protocol::P2p(self.local_peer_id));
765 }
766
767 if Self::can_add_to_dht(&address) {
768 if self.known_external_addresses.insert(address.clone()) {
771 info!(
772 target: "sub-libp2p",
773 "🔍 Discovered new external address for our node: {address}",
774 );
775 }
776 }
777
778 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
779 },
780 }
781 }
782
783 fn on_connection_handler_event(
784 &mut self,
785 peer_id: PeerId,
786 connection_id: ConnectionId,
787 event: THandlerOutEvent<Self>,
788 ) {
789 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
790 }
791
792 fn poll(
793 &mut self,
794 cx: &mut Context,
795 params: &mut impl PollParameters,
796 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
797 if let Some(ev) = self.pending_events.pop_front() {
799 return Poll::Ready(ToSwarm::GenerateEvent(ev))
800 }
801
802 if let Some(kademlia) = self.kademlia.as_mut() {
804 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
805 while next_kad_random_query.poll_unpin(cx).is_ready() {
806 let actually_started =
807 if self.num_connections < self.discovery_only_if_under_num {
808 let random_peer_id = PeerId::random();
809 debug!(
810 target: "sub-libp2p",
811 "Libp2p <= Starting random Kademlia request for {:?}",
812 random_peer_id,
813 );
814 kademlia.get_closest_peers(random_peer_id);
815 true
816 } else {
817 debug!(
818 target: "sub-libp2p",
819 "Kademlia paused due to high number of connections ({})",
820 self.num_connections
821 );
822 false
823 };
824
825 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
828 self.duration_to_next_kad =
829 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
830
831 if actually_started {
832 let ev = DiscoveryOut::RandomKademliaStarted;
833 return Poll::Ready(ToSwarm::GenerateEvent(ev))
834 }
835 }
836 }
837 }
838
839 while let Poll::Ready(ev) = self.kademlia.poll(cx, params) {
840 match ev {
841 ToSwarm::GenerateEvent(ev) => match ev {
842 KademliaEvent::RoutingUpdated { peer, .. } => {
843 let ev = DiscoveryOut::Discovered(peer);
844 return Poll::Ready(ToSwarm::GenerateEvent(ev))
845 },
846 KademliaEvent::UnroutablePeer { peer, .. } => {
847 let ev = DiscoveryOut::UnroutablePeer(peer);
848 return Poll::Ready(ToSwarm::GenerateEvent(ev))
849 },
850 KademliaEvent::RoutablePeer { peer, .. } => {
851 let ev = DiscoveryOut::Discovered(peer);
852 return Poll::Ready(ToSwarm::GenerateEvent(ev))
853 },
854 KademliaEvent::PendingRoutablePeer { .. } => {
855 },
857 KademliaEvent::InboundRequest { request } => match request {
858 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
859 return Poll::Ready(ToSwarm::GenerateEvent(
860 DiscoveryOut::PutRecordRequest(
861 record.key,
862 record.value,
863 record.publisher.map(Into::into),
864 record.expires,
865 ),
866 )),
867 _ => {},
868 },
869 KademliaEvent::OutboundQueryProgressed {
870 result: QueryResult::GetClosestPeers(res),
871 ..
872 } => match res {
873 Err(GetClosestPeersError::Timeout { key, peers }) => {
874 debug!(
875 target: "sub-libp2p",
876 "Libp2p => Query for {:?} timed out with {} results",
877 HexDisplay::from(&key), peers.len(),
878 );
879 },
880 Ok(ok) => {
881 trace!(
882 target: "sub-libp2p",
883 "Libp2p => Query for {:?} yielded {:?} results",
884 HexDisplay::from(&ok.key), ok.peers.len(),
885 );
886 if ok.peers.is_empty() && self.num_connections != 0 {
887 debug!(
888 target: "sub-libp2p",
889 "Libp2p => Random Kademlia query has yielded empty results",
890 );
891 }
892 },
893 },
894 KademliaEvent::OutboundQueryProgressed {
895 result: QueryResult::GetRecord(res),
896 stats,
897 id,
898 ..
899 } => {
900 let ev = match res {
901 Ok(GetRecordOk::FoundRecord(r)) => {
902 debug!(
903 target: "sub-libp2p",
904 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
905 r.record.key,
906 r.record.value,
907 id,
908 stats,
909 );
910
911 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
918 if let Some(kad) = self.kademlia.as_mut() {
919 if let Some(mut query) = kad.query_mut(&id) {
920 query.finish();
921 }
922 }
923 }
924
925 self.records_to_publish.insert(id, r.record.clone());
928
929 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
930 },
931 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
932 cache_candidates,
933 }) => {
934 debug!(
935 target: "sub-libp2p",
936 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
937 id,
938 stats,
939 stats.duration().map(|val| val.as_millis())
940 );
941 if let Some(record) = self.records_to_publish.remove(&id) {
943 if cache_candidates.is_empty() {
944 continue
945 }
946
947 if let Some(kad) = self.kademlia.as_mut() {
950 kad.put_record_to(
951 record,
952 cache_candidates.into_iter().map(|v| v.1),
953 Quorum::One,
954 );
955 }
956 }
957
958 continue
959 },
960 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
961 trace!(
962 target: "sub-libp2p",
963 "Libp2p => Failed to get record: {:?}",
964 e,
965 );
966 DiscoveryOut::ValueNotFound(
967 e.into_key(),
968 stats.duration().unwrap_or_default(),
969 )
970 },
971 Err(e) => {
972 debug!(
973 target: "sub-libp2p",
974 "Libp2p => Failed to get record: {:?}",
975 e,
976 );
977 DiscoveryOut::ValueNotFound(
978 e.into_key(),
979 stats.duration().unwrap_or_default(),
980 )
981 },
982 };
983 return Poll::Ready(ToSwarm::GenerateEvent(ev))
984 },
985 KademliaEvent::OutboundQueryProgressed {
986 result: QueryResult::PutRecord(res),
987 stats,
988 ..
989 } => {
990 let ev = match res {
991 Ok(ok) =>
992 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()),
993 Err(e) => {
994 debug!(
995 target: "sub-libp2p",
996 "Libp2p => Failed to put record: {:?}",
997 e,
998 );
999 DiscoveryOut::ValuePutFailed(
1000 e.into_key(),
1001 stats.duration().unwrap_or_default(),
1002 )
1003 },
1004 };
1005 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1006 },
1007 KademliaEvent::OutboundQueryProgressed {
1008 result: QueryResult::RepublishRecord(res),
1009 ..
1010 } => match res {
1011 Ok(ok) => debug!(
1012 target: "sub-libp2p",
1013 "Libp2p => Record republished: {:?}",
1014 ok.key,
1015 ),
1016 Err(e) => debug!(
1017 target: "sub-libp2p",
1018 "Libp2p => Republishing of record {:?} failed with: {:?}",
1019 e.key(), e,
1020 ),
1021 },
1022 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1024 warn!(target: "sub-libp2p", "Libp2p => Unhandled Kademlia event: {:?}", e)
1025 },
1026 },
1027 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1028 ToSwarm::NotifyHandler { peer_id, handler, event } =>
1029 return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
1030 ToSwarm::CloseConnection { peer_id, connection } =>
1031 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
1032 ToSwarm::NewExternalAddrCandidate(observed) =>
1033 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
1034 ToSwarm::ExternalAddrConfirmed(addr) =>
1035 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
1036 ToSwarm::ExternalAddrExpired(addr) =>
1037 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
1038 ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
1039 ToSwarm::RemoveListener { id } =>
1040 return Poll::Ready(ToSwarm::RemoveListener { id }),
1041 }
1042 }
1043
1044 while let Poll::Ready(ev) = self.mdns.poll(cx, params) {
1046 match ev {
1047 ToSwarm::GenerateEvent(event) => match event {
1048 mdns::Event::Discovered(list) => {
1049 if self.num_connections >= self.discovery_only_if_under_num {
1050 continue
1051 }
1052
1053 self.pending_events.extend(
1054 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1055 );
1056 if let Some(ev) = self.pending_events.pop_front() {
1057 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1058 }
1059 },
1060 mdns::Event::Expired(_) => {},
1061 },
1062 ToSwarm::Dial { .. } => {
1063 unreachable!("mDNS never dials!");
1064 },
1065 ToSwarm::NotifyHandler { event, .. } => match event {},
1067 ToSwarm::CloseConnection { peer_id, connection } =>
1068 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
1069 ToSwarm::NewExternalAddrCandidate(observed) =>
1070 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
1071 ToSwarm::ExternalAddrConfirmed(addr) =>
1072 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
1073 ToSwarm::ExternalAddrExpired(addr) =>
1074 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
1075 ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
1076 ToSwarm::RemoveListener { id } =>
1077 return Poll::Ready(ToSwarm::RemoveListener { id }),
1078 }
1079 }
1080
1081 Poll::Pending
1082 }
1083}
1084
1085fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1087 let name = format!("/{}/kad", id.as_ref());
1088 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1089}
1090
1091fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1093 genesis_hash: Hash,
1094 fork_id: Option<&str>,
1095) -> StreamProtocol {
1096 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1097 let name = if let Some(fork_id) = fork_id {
1098 format!("/{genesis_hash_hex}/{fork_id}/kad")
1099 } else {
1100 format!("/{genesis_hash_hex}/kad")
1101 };
1102
1103 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::{
1109 kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig, DiscoveryOut,
1110 };
1111 use crate::config::ProtocolId;
1112 use futures::prelude::*;
1113 use libp2p::{
1114 core::{
1115 transport::{MemoryTransport, Transport},
1116 upgrade,
1117 },
1118 identity::Keypair,
1119 noise,
1120 swarm::{Executor, Swarm, SwarmEvent},
1121 yamux, Multiaddr,
1122 };
1123 use sp_core::hash::H256;
1124 use std::{collections::HashSet, pin::Pin, task::Poll};
1125
1126 struct TokioExecutor(tokio::runtime::Runtime);
1127 impl Executor for TokioExecutor {
1128 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1129 let _ = self.0.spawn(f);
1130 }
1131 }
1132
1133 #[test]
1134 fn discovery_working() {
1135 let mut first_swarm_peer_id_and_addr = None;
1136
1137 let genesis_hash = H256::from_low_u64_be(1);
1138 let fork_id = Some("test-fork-id");
1139 let protocol_id = ProtocolId::from("dot");
1140
1141 let mut swarms = (0..25)
1144 .map(|i| {
1145 let keypair = Keypair::generate_ed25519();
1146
1147 let transport = MemoryTransport::new()
1148 .upgrade(upgrade::Version::V1)
1149 .authenticate(noise::Config::new(&keypair).unwrap())
1150 .multiplex(yamux::Config::default())
1151 .boxed();
1152
1153 let behaviour = {
1154 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1155 config
1156 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1157 .allow_private_ip(true)
1158 .allow_non_globals_in_dht(true)
1159 .discovery_limit(50)
1160 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1161
1162 config.finish()
1163 };
1164
1165 let runtime = tokio::runtime::Runtime::new().unwrap();
1166 #[allow(deprecated)]
1167 let mut swarm = libp2p::swarm::SwarmBuilder::with_executor(
1168 transport,
1169 behaviour,
1170 keypair.public().to_peer_id(),
1171 TokioExecutor(runtime),
1172 )
1173 .build();
1174
1175 let listen_addr: Multiaddr =
1176 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1177
1178 if i == 0 {
1179 first_swarm_peer_id_and_addr =
1180 Some((keypair.public().to_peer_id(), listen_addr.clone()))
1181 }
1182
1183 swarm.listen_on(listen_addr.clone()).unwrap();
1184 (swarm, listen_addr)
1185 })
1186 .collect::<Vec<_>>();
1187
1188 let mut to_discover = (0..swarms.len())
1190 .map(|n| {
1191 (0..swarms.len())
1192 .skip(1)
1194 .filter(|p| *p != n)
1195 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1196 .collect::<HashSet<_>>()
1197 })
1198 .collect::<Vec<_>>();
1199
1200 let fut = futures::future::poll_fn(move |cx| {
1201 'polling: loop {
1202 for swarm_n in 0..swarms.len() {
1203 match swarms[swarm_n].0.poll_next_unpin(cx) {
1204 Poll::Ready(Some(e)) => {
1205 match e {
1206 SwarmEvent::Behaviour(behavior) => {
1207 match behavior {
1208 DiscoveryOut::UnroutablePeer(other) |
1209 DiscoveryOut::Discovered(other) => {
1210 let addr = swarms
1213 .iter()
1214 .find_map(|(s, a)| {
1215 if s.behaviour().local_peer_id == other {
1216 Some(a.clone())
1217 } else {
1218 None
1219 }
1220 })
1221 .unwrap();
1222 let protocol_names = if swarm_n % 2 == 0 {
1225 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1226 } else {
1227 vec![
1228 legacy_kademlia_protocol_name(&protocol_id),
1229 kademlia_protocol_name(genesis_hash, fork_id),
1230 ]
1231 };
1232 swarms[swarm_n]
1233 .0
1234 .behaviour_mut()
1235 .add_self_reported_address(
1236 &other,
1237 protocol_names.as_slice(),
1238 addr,
1239 );
1240
1241 to_discover[swarm_n].remove(&other);
1242 },
1243 DiscoveryOut::RandomKademliaStarted => {},
1244 e => {
1245 panic!("Unexpected event: {:?}", e)
1246 },
1247 }
1248 },
1249 _ => {},
1251 }
1252 continue 'polling
1253 },
1254 _ => {},
1255 }
1256 }
1257 break
1258 }
1259
1260 if to_discover.iter().all(|l| l.is_empty()) {
1261 Poll::Ready(())
1262 } else {
1263 Poll::Pending
1264 }
1265 });
1266
1267 futures::executor::block_on(fut);
1268 }
1269
1270 #[test]
1271 fn discovery_ignores_peers_with_unknown_protocols() {
1272 let supported_genesis_hash = H256::from_low_u64_be(1);
1273 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1274 let supported_protocol_id = ProtocolId::from("a");
1275 let unsupported_protocol_id = ProtocolId::from("b");
1276
1277 let mut discovery = {
1278 let keypair = Keypair::generate_ed25519();
1279 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1280 config
1281 .allow_private_ip(true)
1282 .allow_non_globals_in_dht(true)
1283 .discovery_limit(50)
1284 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1285 config.finish()
1286 };
1287
1288 let predictable_peer_id = |bytes: &[u8; 32]| {
1289 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1290 };
1291
1292 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1293 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1294 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1295 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1296
1297 discovery.add_self_reported_address(
1299 &remote_peer_id,
1300 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1301 remote_addr.clone(),
1302 );
1303 discovery.add_self_reported_address(
1304 &another_peer_id,
1305 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1306 another_addr.clone(),
1307 );
1308
1309 {
1310 let kademlia = discovery.kademlia.as_mut().unwrap();
1311 assert!(
1312 kademlia
1313 .kbucket(remote_peer_id)
1314 .expect("Remote peer id not to be equal to local peer id.")
1315 .is_empty(),
1316 "Expect peer with unsupported protocol not to be added."
1317 );
1318 assert!(
1319 kademlia
1320 .kbucket(another_peer_id)
1321 .expect("Remote peer id not to be equal to local peer id.")
1322 .is_empty(),
1323 "Expect peer with unsupported protocol not to be added."
1324 );
1325 }
1326
1327 discovery.add_self_reported_address(
1329 &remote_peer_id,
1330 &[kademlia_protocol_name(supported_genesis_hash, None)],
1331 remote_addr.clone(),
1332 );
1333 {
1334 let kademlia = discovery.kademlia.as_mut().unwrap();
1335 assert!(
1336 !kademlia
1337 .kbucket(remote_peer_id)
1338 .expect("Remote peer id not to be equal to local peer id.")
1339 .is_empty(),
1340 "Expect peer with supported protocol to be added."
1341 );
1342 }
1343
1344 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1345 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1346
1347 {
1349 let kademlia = discovery.kademlia.as_mut().unwrap();
1350 assert!(
1351 kademlia
1352 .kbucket(unsupported_peer_id)
1353 .expect("Remote peer id not to be equal to local peer id.")
1354 .is_empty(),
1355 "Expect unsupported peer not to be added."
1356 );
1357 }
1358 discovery.add_self_reported_address(
1361 &unsupported_peer_id,
1362 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1363 unsupported_peer_addr.clone(),
1364 );
1365 {
1366 let kademlia = discovery.kademlia.as_mut().unwrap();
1367 assert!(
1368 kademlia
1369 .kbucket(unsupported_peer_id)
1370 .expect("Remote peer id not to be equal to local peer id.")
1371 .is_empty(),
1372 "Expect unsupported peer not to be added."
1373 );
1374 }
1375
1376 discovery.add_self_reported_address(
1378 &another_peer_id,
1379 &[
1380 legacy_kademlia_protocol_name(&supported_protocol_id),
1381 kademlia_protocol_name(supported_genesis_hash, None),
1382 ],
1383 another_addr.clone(),
1384 );
1385
1386 {
1387 let kademlia = discovery.kademlia.as_mut().unwrap();
1388 assert_eq!(
1389 2,
1390 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1391 "Expect peers with supported protocol to be added."
1392 );
1393 assert!(
1394 !kademlia
1395 .kbucket(another_peer_id)
1396 .expect("Remote peer id not to be equal to local peer id.")
1397 .is_empty(),
1398 "Expect peer with supported protocol to be added."
1399 );
1400 }
1401 }
1402}