libp2p_identify/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
23    num::NonZeroUsize,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use libp2p_core::{
29    multiaddr, multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr,
30};
31use libp2p_identity::{PeerId, PublicKey};
32use libp2p_swarm::{
33    behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
34    ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses,
35    NetworkBehaviour, NotifyHandler, PeerAddresses, StreamUpgradeError, THandler, THandlerInEvent,
36    THandlerOutEvent, ToSwarm, _address_translation,
37};
38
39use crate::{
40    handler::{self, Handler, InEvent},
41    protocol::{Info, UpgradeError},
42};
43
44/// Whether an [`Multiaddr`] is a valid for the QUIC transport.
45fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
46    use Protocol::*;
47    let mut iter = addr.iter();
48    let Some(first) = iter.next() else {
49        return false;
50    };
51    let Some(second) = iter.next() else {
52        return false;
53    };
54    let Some(third) = iter.next() else {
55        return false;
56    };
57    let fourth = iter.next();
58    let fifth = iter.next();
59
60    matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
61        && matches!(second, Udp(_))
62        && if v1 {
63            matches!(third, QuicV1)
64        } else {
65            matches!(third, Quic)
66        }
67        && matches!(fourth, Some(P2p(_)) | None)
68        && fifth.is_none()
69}
70
71fn is_tcp_addr(addr: &Multiaddr) -> bool {
72    use Protocol::*;
73
74    let mut iter = addr.iter();
75
76    let first = match iter.next() {
77        None => return false,
78        Some(p) => p,
79    };
80    let second = match iter.next() {
81        None => return false,
82        Some(p) => p,
83    };
84
85    matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
86}
87
88/// Network behaviour that automatically identifies nodes periodically, returns information
89/// about them, and answers identify queries from other nodes.
90///
91/// All external addresses of the local node supposedly observed by remotes
92/// are reported via [`ToSwarm::NewExternalAddrCandidate`].
93pub struct Behaviour {
94    config: Config,
95    /// For each peer we're connected to, the observed address to send back to it.
96    connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
97
98    /// The address a remote observed for us.
99    our_observed_addresses: HashMap<ConnectionId, Multiaddr>,
100
101    /// The outbound connections established without port reuse (require translation)
102    outbound_connections_with_ephemeral_port: HashSet<ConnectionId>,
103
104    /// Pending events to be emitted when polled.
105    events: VecDeque<ToSwarm<Event, InEvent>>,
106    /// The addresses of all peers that we have discovered.
107    discovered_peers: PeerCache,
108
109    listen_addresses: ListenAddresses,
110    external_addresses: ExternalAddresses,
111}
112
113/// Configuration for the [`identify::Behaviour`](Behaviour).
114#[non_exhaustive]
115#[derive(Debug, Clone)]
116pub struct Config {
117    /// Application-specific version of the protocol family used by the peer,
118    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
119    protocol_version: String,
120    /// The public key of the local node. To report on the wire.
121    local_public_key: PublicKey,
122    /// Name and version of the local peer implementation, similar to the
123    /// `User-Agent` header in the HTTP protocol.
124    ///
125    /// Defaults to `rust-libp2p/<libp2p-identify-version>`.
126    agent_version: String,
127    /// The interval at which identification requests are sent to
128    /// the remote on established connections after the first request,
129    /// i.e. the delay between identification requests.
130    ///
131    /// Defaults to 5 minutes.
132    interval: Duration,
133
134    /// Whether new or expired listen addresses of the local node should
135    /// trigger an active push of an identify message to all connected peers.
136    ///
137    /// Enabling this option can result in connected peers being informed
138    /// earlier about new or expired listen addresses of the local node,
139    /// i.e. before the next periodic identify request with each peer.
140    ///
141    /// Disabled by default.
142    push_listen_addr_updates: bool,
143
144    /// How many entries of discovered peers to keep before we discard
145    /// the least-recently used one.
146    ///
147    /// Disabled by default.
148    cache_size: usize,
149
150    /// Whether to include our listen addresses in our responses. If enabled,
151    /// we will effectively only share our external addresses.
152    ///
153    /// Disabled by default.
154    hide_listen_addrs: bool,
155}
156
157impl Config {
158    /// Creates a new configuration for the identify [`Behaviour`] that
159    /// advertises the given protocol version and public key.
160    pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
161        Self {
162            protocol_version,
163            agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
164            local_public_key,
165            interval: Duration::from_secs(5 * 60),
166            push_listen_addr_updates: false,
167            cache_size: 100,
168            hide_listen_addrs: false,
169        }
170    }
171
172    /// Configures the agent version sent to peers.
173    pub fn with_agent_version(mut self, v: String) -> Self {
174        self.agent_version = v;
175        self
176    }
177
178    /// Configures the interval at which identification requests are
179    /// sent to peers after the initial request.
180    pub fn with_interval(mut self, d: Duration) -> Self {
181        self.interval = d;
182        self
183    }
184
185    /// Configures whether new or expired listen addresses of the local
186    /// node should trigger an active push of an identify message to all
187    /// connected peers.
188    pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
189        self.push_listen_addr_updates = b;
190        self
191    }
192
193    /// Configures the size of the LRU cache, caching addresses of discovered peers.
194    pub fn with_cache_size(mut self, cache_size: usize) -> Self {
195        self.cache_size = cache_size;
196        self
197    }
198
199    /// Configures whether we prevent sending out our listen addresses.
200    pub fn with_hide_listen_addrs(mut self, b: bool) -> Self {
201        self.hide_listen_addrs = b;
202        self
203    }
204
205    /// Get the protocol version of the Config.
206    pub fn protocol_version(&self) -> &str {
207        &self.protocol_version
208    }
209
210    /// Get the local public key of the Config.
211    pub fn local_public_key(&self) -> &PublicKey {
212        &self.local_public_key
213    }
214
215    /// Get the agent version of the Config.
216    pub fn agent_version(&self) -> &str {
217        &self.agent_version
218    }
219
220    /// Get the interval of the Config.
221    pub fn interval(&self) -> Duration {
222        self.interval
223    }
224
225    /// Get the push listen address updates boolean value of the Config.
226    pub fn push_listen_addr_updates(&self) -> bool {
227        self.push_listen_addr_updates
228    }
229
230    /// Get the cache size of the Config.
231    pub fn cache_size(&self) -> usize {
232        self.cache_size
233    }
234
235    /// Get the hide listen address boolean value of the Config.
236    pub fn hide_listen_addrs(&self) -> bool {
237        self.hide_listen_addrs
238    }
239}
240
241impl Behaviour {
242    /// Creates a new identify [`Behaviour`].
243    pub fn new(config: Config) -> Self {
244        let discovered_peers = match NonZeroUsize::new(config.cache_size) {
245            None => PeerCache::disabled(),
246            Some(size) => PeerCache::enabled(size),
247        };
248
249        Self {
250            config,
251            connected: HashMap::new(),
252            our_observed_addresses: Default::default(),
253            outbound_connections_with_ephemeral_port: Default::default(),
254            events: VecDeque::new(),
255            discovered_peers,
256            listen_addresses: Default::default(),
257            external_addresses: Default::default(),
258        }
259    }
260
261    /// Initiates an active push of the local peer information to the given peers.
262    pub fn push<I>(&mut self, peers: I)
263    where
264        I: IntoIterator<Item = PeerId>,
265    {
266        for p in peers {
267            if !self.connected.contains_key(&p) {
268                tracing::debug!(peer=%p, "Not pushing to peer because we are not connected");
269                continue;
270            }
271
272            self.events.push_back(ToSwarm::NotifyHandler {
273                peer_id: p,
274                handler: NotifyHandler::Any,
275                event: InEvent::Push,
276            });
277        }
278    }
279
280    fn on_connection_established(
281        &mut self,
282        ConnectionEstablished {
283            peer_id,
284            connection_id: conn,
285            endpoint,
286            failed_addresses,
287            ..
288        }: ConnectionEstablished,
289    ) {
290        let addr = match endpoint {
291            ConnectedPoint::Dialer { address, .. } => address.clone(),
292            ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
293        };
294
295        self.connected
296            .entry(peer_id)
297            .or_default()
298            .insert(conn, addr);
299
300        if let Some(cache) = self.discovered_peers.0.as_mut() {
301            for addr in failed_addresses {
302                cache.remove(&peer_id, addr);
303            }
304        }
305    }
306
307    fn all_addresses(&self) -> HashSet<Multiaddr> {
308        let mut addrs = HashSet::from_iter(self.external_addresses.iter().cloned());
309        if !self.config.hide_listen_addrs {
310            addrs.extend(self.listen_addresses.iter().cloned());
311        };
312        addrs
313    }
314
315    fn emit_new_external_addr_candidate_event(
316        &mut self,
317        connection_id: ConnectionId,
318        observed: &Multiaddr,
319    ) {
320        if self
321            .outbound_connections_with_ephemeral_port
322            .contains(&connection_id)
323        {
324            // Apply address translation to the candidate address.
325            // For TCP without port-reuse, the observed address contains an ephemeral port which
326            // needs to be replaced by the port of a listen address.
327            let translated_addresses = {
328                let mut addrs: Vec<_> = self
329                    .listen_addresses
330                    .iter()
331                    .filter_map(|server| {
332                        if (is_tcp_addr(server) && is_tcp_addr(observed))
333                            || (is_quic_addr(server, true) && is_quic_addr(observed, true))
334                            || (is_quic_addr(server, false) && is_quic_addr(observed, false))
335                        {
336                            _address_translation(server, observed)
337                        } else {
338                            None
339                        }
340                    })
341                    .collect();
342
343                // remove duplicates
344                addrs.sort_unstable();
345                addrs.dedup();
346                addrs
347            };
348
349            // If address translation yielded nothing, broadcast the original candidate address.
350            if translated_addresses.is_empty() {
351                self.events
352                    .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
353            } else {
354                for addr in translated_addresses {
355                    self.events
356                        .push_back(ToSwarm::NewExternalAddrCandidate(addr));
357                }
358            }
359            return;
360        }
361
362        // outgoing connection dialed with port reuse
363        // incoming connection
364        self.events
365            .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
366    }
367}
368
369impl NetworkBehaviour for Behaviour {
370    type ConnectionHandler = Handler;
371    type ToSwarm = Event;
372
373    fn handle_established_inbound_connection(
374        &mut self,
375        _: ConnectionId,
376        peer: PeerId,
377        _: &Multiaddr,
378        remote_addr: &Multiaddr,
379    ) -> Result<THandler<Self>, ConnectionDenied> {
380        Ok(Handler::new(
381            self.config.interval,
382            peer,
383            self.config.local_public_key.clone(),
384            self.config.protocol_version.clone(),
385            self.config.agent_version.clone(),
386            remote_addr.clone(),
387            self.all_addresses(),
388        ))
389    }
390
391    fn handle_established_outbound_connection(
392        &mut self,
393        connection_id: ConnectionId,
394        peer: PeerId,
395        addr: &Multiaddr,
396        _: Endpoint,
397        port_use: PortUse,
398    ) -> Result<THandler<Self>, ConnectionDenied> {
399        // Contrary to inbound events, outbound events are full-p2p qualified
400        // so we remove /p2p/ in order to be homogeneous
401        // this will avoid Autonatv2 to probe twice the same address (fully-p2p-qualified + not
402        // fully-p2p-qualified)
403        let mut addr = addr.clone();
404        if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
405            addr.pop();
406        }
407
408        if port_use == PortUse::New {
409            self.outbound_connections_with_ephemeral_port
410                .insert(connection_id);
411        }
412
413        Ok(Handler::new(
414            self.config.interval,
415            peer,
416            self.config.local_public_key.clone(),
417            self.config.protocol_version.clone(),
418            self.config.agent_version.clone(),
419            // TODO: This is weird? That is the public address we dialed,
420            // shouldn't need to tell the other party?
421            addr.clone(),
422            self.all_addresses(),
423        ))
424    }
425
426    fn on_connection_handler_event(
427        &mut self,
428        peer_id: PeerId,
429        connection_id: ConnectionId,
430        event: THandlerOutEvent<Self>,
431    ) {
432        match event {
433            handler::Event::Identified(mut info) => {
434                // Remove invalid multiaddrs.
435                info.listen_addrs
436                    .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
437
438                let observed = info.observed_addr.clone();
439                self.events
440                    .push_back(ToSwarm::GenerateEvent(Event::Received {
441                        connection_id,
442                        peer_id,
443                        info: info.clone(),
444                    }));
445
446                if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
447                    for address in &info.listen_addrs {
448                        if discovered_peers.add(peer_id, address.clone()) {
449                            self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
450                                peer_id,
451                                address: address.clone(),
452                            });
453                        }
454                    }
455                }
456
457                match self.our_observed_addresses.entry(connection_id) {
458                    Entry::Vacant(not_yet_observed) => {
459                        not_yet_observed.insert(observed.clone());
460                        self.emit_new_external_addr_candidate_event(connection_id, &observed);
461                    }
462                    Entry::Occupied(already_observed) if already_observed.get() == &observed => {
463                        // No-op, we already observed this address.
464                    }
465                    Entry::Occupied(mut already_observed) => {
466                        tracing::info!(
467                            old_address=%already_observed.get(),
468                            new_address=%observed,
469                            "Our observed address on connection {connection_id} changed",
470                        );
471
472                        *already_observed.get_mut() = observed.clone();
473                        self.emit_new_external_addr_candidate_event(connection_id, &observed);
474                    }
475                }
476            }
477            handler::Event::Identification => {
478                self.events.push_back(ToSwarm::GenerateEvent(Event::Sent {
479                    connection_id,
480                    peer_id,
481                }));
482            }
483            handler::Event::IdentificationPushed(info) => {
484                self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed {
485                    connection_id,
486                    peer_id,
487                    info,
488                }));
489            }
490            handler::Event::IdentificationError(error) => {
491                self.events.push_back(ToSwarm::GenerateEvent(Event::Error {
492                    connection_id,
493                    peer_id,
494                    error,
495                }));
496            }
497        }
498    }
499
500    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
501    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
502        if let Some(event) = self.events.pop_front() {
503            return Poll::Ready(event);
504        }
505
506        Poll::Pending
507    }
508
509    fn handle_pending_outbound_connection(
510        &mut self,
511        _connection_id: ConnectionId,
512        maybe_peer: Option<PeerId>,
513        _addresses: &[Multiaddr],
514        _effective_role: Endpoint,
515    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
516        let peer = match maybe_peer {
517            None => return Ok(vec![]),
518            Some(peer) => peer,
519        };
520
521        Ok(self.discovered_peers.get(&peer))
522    }
523
524    fn on_swarm_event(&mut self, event: FromSwarm) {
525        let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
526        let external_addr_changed = self.external_addresses.on_swarm_event(&event);
527
528        if listen_addr_changed || external_addr_changed {
529            // notify all connected handlers about our changed addresses
530            let change_events = self
531                .connected
532                .iter()
533                .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
534                .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
535                    peer_id,
536                    handler: NotifyHandler::One(*connection_id),
537                    event: InEvent::AddressesChanged(self.all_addresses()),
538                })
539                .collect::<Vec<_>>();
540
541            self.events.extend(change_events)
542        }
543
544        if listen_addr_changed && self.config.push_listen_addr_updates {
545            // trigger an identify push for all connected peers
546            let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
547                peer_id: *peer,
548                handler: NotifyHandler::Any,
549                event: InEvent::Push,
550            });
551
552            self.events.extend(push_events);
553        }
554
555        match event {
556            FromSwarm::ConnectionEstablished(connection_established) => {
557                self.on_connection_established(connection_established)
558            }
559            FromSwarm::ConnectionClosed(ConnectionClosed {
560                peer_id,
561                connection_id,
562                remaining_established,
563                ..
564            }) => {
565                if remaining_established == 0 {
566                    self.connected.remove(&peer_id);
567                } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
568                    addrs.remove(&connection_id);
569                }
570
571                self.our_observed_addresses.remove(&connection_id);
572                self.outbound_connections_with_ephemeral_port
573                    .remove(&connection_id);
574            }
575            FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
576                if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
577                    (peer_id, self.discovered_peers.0.as_mut(), error)
578                {
579                    for (addr, _error) in errors {
580                        cache.remove(&peer_id, addr);
581                    }
582                }
583            }
584            _ => {}
585        }
586    }
587}
588
589/// Event emitted  by the `Identify` behaviour.
590#[allow(clippy::large_enum_variant)]
591#[derive(Debug)]
592pub enum Event {
593    /// Identification information has been received from a peer.
594    Received {
595        /// Identifier of the connection.
596        connection_id: ConnectionId,
597        /// The peer that has been identified.
598        peer_id: PeerId,
599        /// The information provided by the peer.
600        info: Info,
601    },
602    /// Identification information of the local node has been sent to a peer in
603    /// response to an identification request.
604    Sent {
605        /// Identifier of the connection.
606        connection_id: ConnectionId,
607        /// The peer that the information has been sent to.
608        peer_id: PeerId,
609    },
610    /// Identification information of the local node has been actively pushed to
611    /// a peer.
612    Pushed {
613        /// Identifier of the connection.
614        connection_id: ConnectionId,
615        /// The peer that the information has been sent to.
616        peer_id: PeerId,
617        /// The full Info struct we pushed to the remote peer. Clients must
618        /// do some diff'ing to know what has changed since the last push.
619        info: Info,
620    },
621    /// Error while attempting to identify the remote.
622    Error {
623        /// Identifier of the connection.
624        connection_id: ConnectionId,
625        /// The peer with whom the error originated.
626        peer_id: PeerId,
627        /// The error that occurred.
628        error: StreamUpgradeError<UpgradeError>,
629    },
630}
631
632impl Event {
633    pub fn connection_id(&self) -> ConnectionId {
634        match self {
635            Event::Received { connection_id, .. }
636            | Event::Sent { connection_id, .. }
637            | Event::Pushed { connection_id, .. }
638            | Event::Error { connection_id, .. } => *connection_id,
639        }
640    }
641}
642
643/// If there is a given peer_id in the multiaddr, make sure it is the same as
644/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
645fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
646    let last_component = addr.iter().last();
647    if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
648        return multi_addr_peer_id == *peer_id;
649    }
650    true
651}
652
653struct PeerCache(Option<PeerAddresses>);
654
655impl PeerCache {
656    fn disabled() -> Self {
657        Self(None)
658    }
659
660    fn enabled(size: NonZeroUsize) -> Self {
661        Self(Some(PeerAddresses::new(size)))
662    }
663
664    fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
665        if let Some(cache) = self.0.as_mut() {
666            cache.get(peer).collect()
667        } else {
668            Vec::new()
669        }
670    }
671}
672
673#[cfg(test)]
674mod tests {
675    use super::*;
676
677    #[test]
678    fn check_multiaddr_matches_peer_id() {
679        let peer_id = PeerId::random();
680        let other_peer_id = PeerId::random();
681        let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
682            .parse()
683            .expect("failed to parse multiaddr");
684
685        let addr_without_peer_id: Multiaddr = addr.clone();
686        let mut addr_with_other_peer_id = addr.clone();
687
688        addr.push(multiaddr::Protocol::P2p(peer_id));
689        addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
690
691        assert!(multiaddr_matches_peer_id(&addr, &peer_id));
692        assert!(!multiaddr_matches_peer_id(
693            &addr_with_other_peer_id,
694            &peer_id
695        ));
696        assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
697    }
698}