1use std::{
22 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
23 num::NonZeroUsize,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use libp2p_core::{
29 multiaddr, multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr,
30};
31use libp2p_identity::{PeerId, PublicKey};
32use libp2p_swarm::{
33 behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
34 ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses,
35 NetworkBehaviour, NotifyHandler, PeerAddresses, StreamUpgradeError, THandler, THandlerInEvent,
36 THandlerOutEvent, ToSwarm, _address_translation,
37};
38
39use crate::{
40 handler::{self, Handler, InEvent},
41 protocol::{Info, UpgradeError},
42};
43
44fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
46 use Protocol::*;
47 let mut iter = addr.iter();
48 let Some(first) = iter.next() else {
49 return false;
50 };
51 let Some(second) = iter.next() else {
52 return false;
53 };
54 let Some(third) = iter.next() else {
55 return false;
56 };
57 let fourth = iter.next();
58 let fifth = iter.next();
59
60 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
61 && matches!(second, Udp(_))
62 && if v1 {
63 matches!(third, QuicV1)
64 } else {
65 matches!(third, Quic)
66 }
67 && matches!(fourth, Some(P2p(_)) | None)
68 && fifth.is_none()
69}
70
71fn is_tcp_addr(addr: &Multiaddr) -> bool {
72 use Protocol::*;
73
74 let mut iter = addr.iter();
75
76 let first = match iter.next() {
77 None => return false,
78 Some(p) => p,
79 };
80 let second = match iter.next() {
81 None => return false,
82 Some(p) => p,
83 };
84
85 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
86}
87
88pub struct Behaviour {
94 config: Config,
95 connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
97
98 our_observed_addresses: HashMap<ConnectionId, Multiaddr>,
100
101 outbound_connections_with_ephemeral_port: HashSet<ConnectionId>,
103
104 events: VecDeque<ToSwarm<Event, InEvent>>,
106 discovered_peers: PeerCache,
108
109 listen_addresses: ListenAddresses,
110 external_addresses: ExternalAddresses,
111}
112
113#[non_exhaustive]
115#[derive(Debug, Clone)]
116pub struct Config {
117 protocol_version: String,
120 local_public_key: PublicKey,
122 agent_version: String,
127 interval: Duration,
133
134 push_listen_addr_updates: bool,
143
144 cache_size: usize,
149
150 hide_listen_addrs: bool,
155}
156
157impl Config {
158 pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
161 Self {
162 protocol_version,
163 agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
164 local_public_key,
165 interval: Duration::from_secs(5 * 60),
166 push_listen_addr_updates: false,
167 cache_size: 100,
168 hide_listen_addrs: false,
169 }
170 }
171
172 pub fn with_agent_version(mut self, v: String) -> Self {
174 self.agent_version = v;
175 self
176 }
177
178 pub fn with_interval(mut self, d: Duration) -> Self {
181 self.interval = d;
182 self
183 }
184
185 pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
189 self.push_listen_addr_updates = b;
190 self
191 }
192
193 pub fn with_cache_size(mut self, cache_size: usize) -> Self {
195 self.cache_size = cache_size;
196 self
197 }
198
199 pub fn with_hide_listen_addrs(mut self, b: bool) -> Self {
201 self.hide_listen_addrs = b;
202 self
203 }
204
205 pub fn protocol_version(&self) -> &str {
207 &self.protocol_version
208 }
209
210 pub fn local_public_key(&self) -> &PublicKey {
212 &self.local_public_key
213 }
214
215 pub fn agent_version(&self) -> &str {
217 &self.agent_version
218 }
219
220 pub fn interval(&self) -> Duration {
222 self.interval
223 }
224
225 pub fn push_listen_addr_updates(&self) -> bool {
227 self.push_listen_addr_updates
228 }
229
230 pub fn cache_size(&self) -> usize {
232 self.cache_size
233 }
234
235 pub fn hide_listen_addrs(&self) -> bool {
237 self.hide_listen_addrs
238 }
239}
240
241impl Behaviour {
242 pub fn new(config: Config) -> Self {
244 let discovered_peers = match NonZeroUsize::new(config.cache_size) {
245 None => PeerCache::disabled(),
246 Some(size) => PeerCache::enabled(size),
247 };
248
249 Self {
250 config,
251 connected: HashMap::new(),
252 our_observed_addresses: Default::default(),
253 outbound_connections_with_ephemeral_port: Default::default(),
254 events: VecDeque::new(),
255 discovered_peers,
256 listen_addresses: Default::default(),
257 external_addresses: Default::default(),
258 }
259 }
260
261 pub fn push<I>(&mut self, peers: I)
263 where
264 I: IntoIterator<Item = PeerId>,
265 {
266 for p in peers {
267 if !self.connected.contains_key(&p) {
268 tracing::debug!(peer=%p, "Not pushing to peer because we are not connected");
269 continue;
270 }
271
272 self.events.push_back(ToSwarm::NotifyHandler {
273 peer_id: p,
274 handler: NotifyHandler::Any,
275 event: InEvent::Push,
276 });
277 }
278 }
279
280 fn on_connection_established(
281 &mut self,
282 ConnectionEstablished {
283 peer_id,
284 connection_id: conn,
285 endpoint,
286 failed_addresses,
287 ..
288 }: ConnectionEstablished,
289 ) {
290 let addr = match endpoint {
291 ConnectedPoint::Dialer { address, .. } => address.clone(),
292 ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
293 };
294
295 self.connected
296 .entry(peer_id)
297 .or_default()
298 .insert(conn, addr);
299
300 if let Some(cache) = self.discovered_peers.0.as_mut() {
301 for addr in failed_addresses {
302 cache.remove(&peer_id, addr);
303 }
304 }
305 }
306
307 fn all_addresses(&self) -> HashSet<Multiaddr> {
308 let mut addrs = HashSet::from_iter(self.external_addresses.iter().cloned());
309 if !self.config.hide_listen_addrs {
310 addrs.extend(self.listen_addresses.iter().cloned());
311 };
312 addrs
313 }
314
315 fn emit_new_external_addr_candidate_event(
316 &mut self,
317 connection_id: ConnectionId,
318 observed: &Multiaddr,
319 ) {
320 if self
321 .outbound_connections_with_ephemeral_port
322 .contains(&connection_id)
323 {
324 let translated_addresses = {
328 let mut addrs: Vec<_> = self
329 .listen_addresses
330 .iter()
331 .filter_map(|server| {
332 if (is_tcp_addr(server) && is_tcp_addr(observed))
333 || (is_quic_addr(server, true) && is_quic_addr(observed, true))
334 || (is_quic_addr(server, false) && is_quic_addr(observed, false))
335 {
336 _address_translation(server, observed)
337 } else {
338 None
339 }
340 })
341 .collect();
342
343 addrs.sort_unstable();
345 addrs.dedup();
346 addrs
347 };
348
349 if translated_addresses.is_empty() {
351 self.events
352 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
353 } else {
354 for addr in translated_addresses {
355 self.events
356 .push_back(ToSwarm::NewExternalAddrCandidate(addr));
357 }
358 }
359 return;
360 }
361
362 self.events
365 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
366 }
367}
368
369impl NetworkBehaviour for Behaviour {
370 type ConnectionHandler = Handler;
371 type ToSwarm = Event;
372
373 fn handle_established_inbound_connection(
374 &mut self,
375 _: ConnectionId,
376 peer: PeerId,
377 _: &Multiaddr,
378 remote_addr: &Multiaddr,
379 ) -> Result<THandler<Self>, ConnectionDenied> {
380 Ok(Handler::new(
381 self.config.interval,
382 peer,
383 self.config.local_public_key.clone(),
384 self.config.protocol_version.clone(),
385 self.config.agent_version.clone(),
386 remote_addr.clone(),
387 self.all_addresses(),
388 ))
389 }
390
391 fn handle_established_outbound_connection(
392 &mut self,
393 connection_id: ConnectionId,
394 peer: PeerId,
395 addr: &Multiaddr,
396 _: Endpoint,
397 port_use: PortUse,
398 ) -> Result<THandler<Self>, ConnectionDenied> {
399 let mut addr = addr.clone();
404 if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
405 addr.pop();
406 }
407
408 if port_use == PortUse::New {
409 self.outbound_connections_with_ephemeral_port
410 .insert(connection_id);
411 }
412
413 Ok(Handler::new(
414 self.config.interval,
415 peer,
416 self.config.local_public_key.clone(),
417 self.config.protocol_version.clone(),
418 self.config.agent_version.clone(),
419 addr.clone(),
422 self.all_addresses(),
423 ))
424 }
425
426 fn on_connection_handler_event(
427 &mut self,
428 peer_id: PeerId,
429 connection_id: ConnectionId,
430 event: THandlerOutEvent<Self>,
431 ) {
432 match event {
433 handler::Event::Identified(mut info) => {
434 info.listen_addrs
436 .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
437
438 let observed = info.observed_addr.clone();
439 self.events
440 .push_back(ToSwarm::GenerateEvent(Event::Received {
441 connection_id,
442 peer_id,
443 info: info.clone(),
444 }));
445
446 if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
447 for address in &info.listen_addrs {
448 if discovered_peers.add(peer_id, address.clone()) {
449 self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
450 peer_id,
451 address: address.clone(),
452 });
453 }
454 }
455 }
456
457 match self.our_observed_addresses.entry(connection_id) {
458 Entry::Vacant(not_yet_observed) => {
459 not_yet_observed.insert(observed.clone());
460 self.emit_new_external_addr_candidate_event(connection_id, &observed);
461 }
462 Entry::Occupied(already_observed) if already_observed.get() == &observed => {
463 }
465 Entry::Occupied(mut already_observed) => {
466 tracing::info!(
467 old_address=%already_observed.get(),
468 new_address=%observed,
469 "Our observed address on connection {connection_id} changed",
470 );
471
472 *already_observed.get_mut() = observed.clone();
473 self.emit_new_external_addr_candidate_event(connection_id, &observed);
474 }
475 }
476 }
477 handler::Event::Identification => {
478 self.events.push_back(ToSwarm::GenerateEvent(Event::Sent {
479 connection_id,
480 peer_id,
481 }));
482 }
483 handler::Event::IdentificationPushed(info) => {
484 self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed {
485 connection_id,
486 peer_id,
487 info,
488 }));
489 }
490 handler::Event::IdentificationError(error) => {
491 self.events.push_back(ToSwarm::GenerateEvent(Event::Error {
492 connection_id,
493 peer_id,
494 error,
495 }));
496 }
497 }
498 }
499
500 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
501 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
502 if let Some(event) = self.events.pop_front() {
503 return Poll::Ready(event);
504 }
505
506 Poll::Pending
507 }
508
509 fn handle_pending_outbound_connection(
510 &mut self,
511 _connection_id: ConnectionId,
512 maybe_peer: Option<PeerId>,
513 _addresses: &[Multiaddr],
514 _effective_role: Endpoint,
515 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
516 let peer = match maybe_peer {
517 None => return Ok(vec![]),
518 Some(peer) => peer,
519 };
520
521 Ok(self.discovered_peers.get(&peer))
522 }
523
524 fn on_swarm_event(&mut self, event: FromSwarm) {
525 let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
526 let external_addr_changed = self.external_addresses.on_swarm_event(&event);
527
528 if listen_addr_changed || external_addr_changed {
529 let change_events = self
531 .connected
532 .iter()
533 .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
534 .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
535 peer_id,
536 handler: NotifyHandler::One(*connection_id),
537 event: InEvent::AddressesChanged(self.all_addresses()),
538 })
539 .collect::<Vec<_>>();
540
541 self.events.extend(change_events)
542 }
543
544 if listen_addr_changed && self.config.push_listen_addr_updates {
545 let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
547 peer_id: *peer,
548 handler: NotifyHandler::Any,
549 event: InEvent::Push,
550 });
551
552 self.events.extend(push_events);
553 }
554
555 match event {
556 FromSwarm::ConnectionEstablished(connection_established) => {
557 self.on_connection_established(connection_established)
558 }
559 FromSwarm::ConnectionClosed(ConnectionClosed {
560 peer_id,
561 connection_id,
562 remaining_established,
563 ..
564 }) => {
565 if remaining_established == 0 {
566 self.connected.remove(&peer_id);
567 } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
568 addrs.remove(&connection_id);
569 }
570
571 self.our_observed_addresses.remove(&connection_id);
572 self.outbound_connections_with_ephemeral_port
573 .remove(&connection_id);
574 }
575 FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
576 if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
577 (peer_id, self.discovered_peers.0.as_mut(), error)
578 {
579 for (addr, _error) in errors {
580 cache.remove(&peer_id, addr);
581 }
582 }
583 }
584 _ => {}
585 }
586 }
587}
588
589#[allow(clippy::large_enum_variant)]
591#[derive(Debug)]
592pub enum Event {
593 Received {
595 connection_id: ConnectionId,
597 peer_id: PeerId,
599 info: Info,
601 },
602 Sent {
605 connection_id: ConnectionId,
607 peer_id: PeerId,
609 },
610 Pushed {
613 connection_id: ConnectionId,
615 peer_id: PeerId,
617 info: Info,
620 },
621 Error {
623 connection_id: ConnectionId,
625 peer_id: PeerId,
627 error: StreamUpgradeError<UpgradeError>,
629 },
630}
631
632impl Event {
633 pub fn connection_id(&self) -> ConnectionId {
634 match self {
635 Event::Received { connection_id, .. }
636 | Event::Sent { connection_id, .. }
637 | Event::Pushed { connection_id, .. }
638 | Event::Error { connection_id, .. } => *connection_id,
639 }
640 }
641}
642
643fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
646 let last_component = addr.iter().last();
647 if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
648 return multi_addr_peer_id == *peer_id;
649 }
650 true
651}
652
653struct PeerCache(Option<PeerAddresses>);
654
655impl PeerCache {
656 fn disabled() -> Self {
657 Self(None)
658 }
659
660 fn enabled(size: NonZeroUsize) -> Self {
661 Self(Some(PeerAddresses::new(size)))
662 }
663
664 fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
665 if let Some(cache) = self.0.as_mut() {
666 cache.get(peer).collect()
667 } else {
668 Vec::new()
669 }
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use super::*;
676
677 #[test]
678 fn check_multiaddr_matches_peer_id() {
679 let peer_id = PeerId::random();
680 let other_peer_id = PeerId::random();
681 let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
682 .parse()
683 .expect("failed to parse multiaddr");
684
685 let addr_without_peer_id: Multiaddr = addr.clone();
686 let mut addr_with_other_peer_id = addr.clone();
687
688 addr.push(multiaddr::Protocol::P2p(peer_id));
689 addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
690
691 assert!(multiaddr_matches_peer_id(&addr, &peer_id));
692 assert!(!multiaddr_matches_peer_id(
693 &addr_with_other_peer_id,
694 &peer_id
695 ));
696 assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
697 }
698}