libp2p_swarm/behaviour/
peer_addresses.rs

1use std::num::NonZeroUsize;
2
3use libp2p_core::Multiaddr;
4use libp2p_identity::PeerId;
5use lru::LruCache;
6
7use crate::{behaviour::FromSwarm, DialError, DialFailure, NewExternalAddrOfPeer};
8
9/// Struct for tracking peers' external addresses of the [`Swarm`](crate::Swarm).
10#[derive(Debug)]
11pub struct PeerAddresses(LruCache<PeerId, LruCache<Multiaddr, ()>>);
12
13impl PeerAddresses {
14    /// Creates a [`PeerAddresses`] cache with capacity for the given number of peers.
15    ///
16    /// For each peer, we will at most store 10 addresses.
17    pub fn new(number_of_peers: NonZeroUsize) -> Self {
18        Self(LruCache::new(number_of_peers))
19    }
20
21    /// Feed a [`FromSwarm`] event to this struct.
22    ///
23    /// Returns whether the event changed peer's known external addresses.
24    pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool {
25        match event {
26            FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { peer_id, addr }) => {
27                self.add(*peer_id, (*addr).clone())
28            }
29            FromSwarm::DialFailure(DialFailure {
30                peer_id: Some(peer_id),
31                error: DialError::Transport(errors),
32                ..
33            }) => {
34                for (addr, _error) in errors {
35                    self.remove(peer_id, addr);
36                }
37                true
38            }
39            _ => false,
40        }
41    }
42
43    /// Adds address to cache.
44    /// Appends address to the existing set if peer addresses already exist.
45    /// Creates a new cache entry for peer_id if no addresses are present.
46    /// Returns true if the newly added address was not previously in the cache.
47    pub fn add(&mut self, peer: PeerId, address: Multiaddr) -> bool {
48        match prepare_addr(&peer, &address) {
49            Ok(address) => {
50                if let Some(cached) = self.0.get_mut(&peer) {
51                    cached.put(address, ()).is_none()
52                } else {
53                    let mut set = LruCache::new(NonZeroUsize::new(10).expect("10 > 0"));
54                    set.put(address, ());
55                    self.0.put(peer, set);
56
57                    true
58                }
59            }
60            Err(_) => false,
61        }
62    }
63
64    /// Returns peer's external addresses.
65    pub fn get(&mut self, peer: &PeerId) -> impl Iterator<Item = Multiaddr> + '_ {
66        self.0
67            .get(peer)
68            .into_iter()
69            .flat_map(|c| c.iter().map(|(m, ())| m))
70            .cloned()
71    }
72
73    /// Removes address from peer addresses cache.
74    /// Returns true if the address was removed.
75    pub fn remove(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
76        match self.0.get_mut(peer) {
77            Some(addrs) => match prepare_addr(peer, address) {
78                Ok(address) => addrs.pop(&address).is_some(),
79                Err(_) => false,
80            },
81            None => false,
82        }
83    }
84}
85
86fn prepare_addr(peer: &PeerId, addr: &Multiaddr) -> Result<Multiaddr, Multiaddr> {
87    addr.clone().with_p2p(*peer)
88}
89
90impl Default for PeerAddresses {
91    fn default() -> Self {
92        Self(LruCache::new(NonZeroUsize::new(100).unwrap()))
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::io;
99
100    use libp2p_core::{
101        multiaddr::Protocol,
102        transport::{memory::MemoryTransportError, TransportError},
103    };
104    use once_cell::sync::Lazy;
105
106    use super::*;
107    use crate::ConnectionId;
108
109    #[test]
110    fn new_peer_addr_returns_correct_changed_value() {
111        let mut cache = PeerAddresses::default();
112        let peer_id = PeerId::random();
113
114        let event = new_external_addr_of_peer1(peer_id);
115
116        let changed = cache.on_swarm_event(&event);
117        assert!(changed);
118
119        let changed = cache.on_swarm_event(&event);
120        assert!(!changed);
121    }
122
123    #[test]
124    fn new_peer_addr_saves_peer_addrs() {
125        let mut cache = PeerAddresses::default();
126        let peer_id = PeerId::random();
127        let event = new_external_addr_of_peer1(peer_id);
128
129        let changed = cache.on_swarm_event(&event);
130        assert!(changed);
131
132        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
133        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
134        assert_eq!(expected, vec![addr1]);
135
136        let event = new_external_addr_of_peer2(peer_id);
137        let changed = cache.on_swarm_event(&event);
138
139        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
140        let addr2 = MEMORY_ADDR_2000.clone().with_p2p(peer_id).unwrap();
141
142        let expected_addrs = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
143        assert!(expected_addrs.contains(&addr1));
144        assert!(expected_addrs.contains(&addr2));
145
146        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>().len();
147        assert_eq!(expected, 2);
148
149        assert!(changed);
150    }
151
152    #[test]
153    fn existing_addr_is_not_added_to_cache() {
154        let mut cache = PeerAddresses::default();
155        let peer_id = PeerId::random();
156
157        let event = new_external_addr_of_peer1(peer_id);
158
159        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
160        let changed = cache.on_swarm_event(&event);
161        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
162        assert!(changed);
163        assert_eq!(expected, vec![addr1]);
164
165        let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap();
166        let changed = cache.on_swarm_event(&event);
167        let expected = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
168        assert!(!changed);
169        assert_eq!(expected, [addr1]);
170    }
171
172    #[test]
173    fn addresses_of_peer_are_removed_when_received_dial_failure() {
174        let mut cache = PeerAddresses::default();
175        let peer_id = PeerId::random();
176
177        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
178        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
179        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
180
181        cache.add(peer_id, addr.clone());
182        cache.add(peer_id, addr2.clone());
183        cache.add(peer_id, addr3.clone());
184
185        let error = DialError::Transport(prepare_errors(vec![addr, addr3]));
186
187        let event = FromSwarm::DialFailure(DialFailure {
188            peer_id: Some(peer_id),
189            error: &error,
190            connection_id: ConnectionId::new_unchecked(8),
191        });
192
193        let changed = cache.on_swarm_event(&event);
194
195        assert!(changed);
196
197        let cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
198        let expected = prepare_expected_addrs(peer_id, [addr2].into_iter());
199
200        assert_eq!(cached, expected);
201    }
202
203    #[test]
204    fn remove_removes_address_if_present() {
205        let mut cache = PeerAddresses::default();
206        let peer_id = PeerId::random();
207        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
208
209        cache.add(peer_id, addr.clone());
210
211        assert!(cache.remove(&peer_id, &addr));
212    }
213
214    #[test]
215    fn remove_returns_false_if_address_not_present() {
216        let mut cache = PeerAddresses::default();
217        let peer_id = PeerId::random();
218        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
219
220        assert!(!cache.remove(&peer_id, &addr));
221    }
222
223    #[test]
224    fn remove_returns_false_if_peer_not_present() {
225        let mut cache = PeerAddresses::default();
226        let peer_id = PeerId::random();
227        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
228
229        assert!(!cache.remove(&peer_id, &addr));
230    }
231
232    #[test]
233    fn remove_removes_address_provided_in_param() {
234        let mut cache = PeerAddresses::default();
235        let peer_id = PeerId::random();
236        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
237        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
238        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
239
240        cache.add(peer_id, addr.clone());
241        cache.add(peer_id, addr2.clone());
242        cache.add(peer_id, addr3.clone());
243
244        assert!(cache.remove(&peer_id, &addr2));
245
246        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
247        cached.sort();
248
249        let expected = prepare_expected_addrs(peer_id, [addr, addr3].into_iter());
250
251        assert_eq!(cached, expected);
252    }
253
254    #[test]
255    fn add_adds_new_address_to_cache() {
256        let mut cache = PeerAddresses::default();
257        let peer_id = PeerId::random();
258        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
259
260        assert!(cache.add(peer_id, addr.clone()));
261
262        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
263        cached.sort();
264        let expected = prepare_expected_addrs(peer_id, [addr].into_iter());
265
266        assert_eq!(cached, expected);
267    }
268
269    #[test]
270    fn add_adds_address_to_cache_to_existing_key() {
271        let mut cache = PeerAddresses::default();
272        let peer_id = PeerId::random();
273        let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
274        let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap();
275        let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap();
276
277        assert!(cache.add(peer_id, addr.clone()));
278
279        cache.add(peer_id, addr2.clone());
280        cache.add(peer_id, addr3.clone());
281
282        let expected = prepare_expected_addrs(peer_id, [addr, addr2, addr3].into_iter());
283
284        let mut cached = cache.get(&peer_id).collect::<Vec<Multiaddr>>();
285        cached.sort();
286
287        assert_eq!(cached, expected);
288    }
289
290    fn prepare_expected_addrs(
291        peer_id: PeerId,
292        addrs: impl Iterator<Item = Multiaddr>,
293    ) -> Vec<Multiaddr> {
294        let mut addrs = addrs
295            .filter_map(|a| a.with_p2p(peer_id).ok())
296            .collect::<Vec<Multiaddr>>();
297        addrs.sort();
298        addrs
299    }
300
301    fn new_external_addr_of_peer1(peer_id: PeerId) -> FromSwarm<'static> {
302        FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
303            peer_id,
304            addr: &MEMORY_ADDR_1000,
305        })
306    }
307
308    fn new_external_addr_of_peer2(peer_id: PeerId) -> FromSwarm<'static> {
309        FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
310            peer_id,
311            addr: &MEMORY_ADDR_2000,
312        })
313    }
314
315    fn prepare_errors(addrs: Vec<Multiaddr>) -> Vec<(Multiaddr, TransportError<io::Error>)> {
316        let errors: Vec<(Multiaddr, TransportError<io::Error>)> = addrs
317            .iter()
318            .map(|addr| {
319                (
320                    addr.clone(),
321                    TransportError::Other(io::Error::new(
322                        io::ErrorKind::Other,
323                        MemoryTransportError::Unreachable,
324                    )),
325                )
326            })
327            .collect();
328        errors
329    }
330
331    static MEMORY_ADDR_1000: Lazy<Multiaddr> =
332        Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(1000)));
333    static MEMORY_ADDR_2000: Lazy<Multiaddr> =
334        Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(2000)));
335}