libp2p_rendezvous/
client.rs

1// Copyright 2021 COMIT Network.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::HashMap,
23    iter,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use futures::{
29    future::{BoxFuture, FutureExt},
30    stream::{FuturesUnordered, StreamExt},
31};
32use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord};
33use libp2p_identity::{Keypair, PeerId, SigningError};
34use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
35use libp2p_swarm::{
36    ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
37    THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40use crate::codec::{
41    Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl,
42};
43
44pub struct Behaviour {
45    inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
46
47    keypair: Keypair,
48
49    waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
50    waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,
51
52    /// Hold addresses of all peers that we have discovered so far.
53    ///
54    /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by
55    /// returning addresses from [`NetworkBehaviour::handle_pending_outbound_connection`].
56    discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
57
58    registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
59
60    /// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers`
61    /// otherwise we have a memory leak.
62    expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
63
64    external_addresses: ExternalAddresses,
65}
66
67impl Behaviour {
68    /// Create a new instance of the rendezvous [`NetworkBehaviour`].
69    pub fn new(keypair: Keypair) -> Self {
70        Self {
71            inner: libp2p_request_response::Behaviour::with_codec(
72                crate::codec::Codec::default(),
73                iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
74                libp2p_request_response::Config::default(),
75            ),
76            keypair,
77            waiting_for_register: Default::default(),
78            waiting_for_discovery: Default::default(),
79            discovered_peers: Default::default(),
80            registered_namespaces: Default::default(),
81            expiring_registrations: FuturesUnordered::from_iter(vec![
82                futures::future::pending().boxed()
83            ]),
84            external_addresses: Default::default(),
85        }
86    }
87
88    /// Register our external addresses in the given namespace with the given rendezvous peer.
89    ///
90    /// External addresses are either manually added via
91    /// [`libp2p_swarm::Swarm::add_external_address`] or reported by other [`NetworkBehaviour`]s
92    /// via [`ToSwarm::ExternalAddrConfirmed`].
93    pub fn register(
94        &mut self,
95        namespace: Namespace,
96        rendezvous_node: PeerId,
97        ttl: Option<Ttl>,
98    ) -> Result<(), RegisterError> {
99        let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
100        if external_addresses.is_empty() {
101            return Err(RegisterError::NoExternalAddresses);
102        }
103
104        let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
105        let req_id = self.inner.send_request(
106            &rendezvous_node,
107            Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
108        );
109        self.waiting_for_register
110            .insert(req_id, (rendezvous_node, namespace));
111
112        Ok(())
113    }
114
115    /// Unregister ourselves from the given namespace with the given rendezvous peer.
116    pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
117        self.registered_namespaces
118            .retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace));
119
120        self.inner
121            .send_request(&rendezvous_node, Unregister(namespace));
122    }
123
124    /// Discover other peers at a given rendezvous peer.
125    ///
126    /// If desired, the registrations can be filtered by a namespace.
127    /// If no namespace is given, peers from all namespaces will be returned.
128    /// A successfully discovery returns a cookie within [`Event::Discovered`].
129    /// Such a cookie can be used to only fetch the _delta_ of registrations since
130    /// the cookie was acquired.
131    pub fn discover(
132        &mut self,
133        namespace: Option<Namespace>,
134        cookie: Option<Cookie>,
135        limit: Option<u64>,
136        rendezvous_node: PeerId,
137    ) {
138        let req_id = self.inner.send_request(
139            &rendezvous_node,
140            Discover {
141                namespace: namespace.clone(),
142                cookie,
143                limit,
144            },
145        );
146
147        self.waiting_for_discovery
148            .insert(req_id, (rendezvous_node, namespace));
149    }
150}
151
152#[derive(Debug, thiserror::Error)]
153pub enum RegisterError {
154    #[error("We don't know about any externally reachable addresses of ours")]
155    NoExternalAddresses,
156    #[error("Failed to make a new PeerRecord")]
157    FailedToMakeRecord(#[from] SigningError),
158}
159
160#[derive(Debug)]
161#[allow(clippy::large_enum_variant)]
162pub enum Event {
163    /// We successfully discovered other nodes with using the contained rendezvous node.
164    Discovered {
165        rendezvous_node: PeerId,
166        registrations: Vec<Registration>,
167        cookie: Cookie,
168    },
169    /// We failed to discover other nodes on the contained rendezvous node.
170    DiscoverFailed {
171        rendezvous_node: PeerId,
172        namespace: Option<Namespace>,
173        error: ErrorCode,
174    },
175    /// We successfully registered with the contained rendezvous node.
176    Registered {
177        rendezvous_node: PeerId,
178        ttl: Ttl,
179        namespace: Namespace,
180    },
181    /// We failed to register with the contained rendezvous node.
182    RegisterFailed {
183        rendezvous_node: PeerId,
184        namespace: Namespace,
185        error: ErrorCode,
186    },
187    /// The connection details we learned from this node expired.
188    Expired { peer: PeerId },
189}
190
191impl NetworkBehaviour for Behaviour {
192    type ConnectionHandler = <libp2p_request_response::Behaviour<
193        crate::codec::Codec,
194    > as NetworkBehaviour>::ConnectionHandler;
195
196    type ToSwarm = Event;
197
198    fn handle_established_inbound_connection(
199        &mut self,
200        connection_id: ConnectionId,
201        peer: PeerId,
202        local_addr: &Multiaddr,
203        remote_addr: &Multiaddr,
204    ) -> Result<THandler<Self>, ConnectionDenied> {
205        self.inner.handle_established_inbound_connection(
206            connection_id,
207            peer,
208            local_addr,
209            remote_addr,
210        )
211    }
212
213    fn handle_established_outbound_connection(
214        &mut self,
215        connection_id: ConnectionId,
216        peer: PeerId,
217        addr: &Multiaddr,
218        role_override: Endpoint,
219        port_use: PortUse,
220    ) -> Result<THandler<Self>, ConnectionDenied> {
221        self.inner.handle_established_outbound_connection(
222            connection_id,
223            peer,
224            addr,
225            role_override,
226            port_use,
227        )
228    }
229
230    fn on_connection_handler_event(
231        &mut self,
232        peer_id: PeerId,
233        connection_id: ConnectionId,
234        event: THandlerOutEvent<Self>,
235    ) {
236        self.inner
237            .on_connection_handler_event(peer_id, connection_id, event);
238    }
239
240    fn on_swarm_event(&mut self, event: FromSwarm) {
241        let changed = self.external_addresses.on_swarm_event(&event);
242
243        self.inner.on_swarm_event(event);
244
245        if changed && self.external_addresses.iter().count() > 0 {
246            let registered = self.registered_namespaces.clone();
247            for ((rz_node, ns), ttl) in registered {
248                if let Err(e) = self.register(ns, rz_node, Some(ttl)) {
249                    tracing::warn!("refreshing registration failed: {e}")
250                }
251            }
252        }
253    }
254
255    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
256    fn poll(
257        &mut self,
258        cx: &mut Context<'_>,
259    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
260        use libp2p_request_response as req_res;
261
262        loop {
263            match self.inner.poll(cx) {
264                Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
265                    message:
266                        req_res::Message::Response {
267                            request_id,
268                            response,
269                        },
270                    ..
271                })) => {
272                    if let Some(event) = self.handle_response(&request_id, response) {
273                        return Poll::Ready(ToSwarm::GenerateEvent(event));
274                    }
275
276                    continue; // not a request we care about
277                }
278                Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
279                    request_id,
280                    ..
281                })) => {
282                    if let Some(event) = self.event_for_outbound_failure(&request_id) {
283                        return Poll::Ready(ToSwarm::GenerateEvent(event));
284                    }
285
286                    continue; // not a request we care about
287                }
288                Poll::Ready(ToSwarm::GenerateEvent(
289                    req_res::Event::InboundFailure { .. }
290                    | req_res::Event::ResponseSent { .. }
291                    | req_res::Event::Message {
292                        message: req_res::Message::Request { .. },
293                        ..
294                    },
295                )) => {
296                    unreachable!("rendezvous clients never receive requests")
297                }
298                Poll::Ready(other) => {
299                    let new_to_swarm =
300                        other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
301
302                    return Poll::Ready(new_to_swarm);
303                }
304                Poll::Pending => {}
305            }
306
307            if let Poll::Ready(Some(expired_registration)) =
308                self.expiring_registrations.poll_next_unpin(cx)
309            {
310                self.discovered_peers.remove(&expired_registration);
311                return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
312                    peer: expired_registration.0,
313                }));
314            }
315
316            return Poll::Pending;
317        }
318    }
319
320    fn handle_pending_outbound_connection(
321        &mut self,
322        _connection_id: ConnectionId,
323        maybe_peer: Option<PeerId>,
324        _addresses: &[Multiaddr],
325        _effective_role: Endpoint,
326    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
327        let peer = match maybe_peer {
328            None => return Ok(vec![]),
329            Some(peer) => peer,
330        };
331
332        let addresses = self
333            .discovered_peers
334            .iter()
335            .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
336            .flatten()
337            .cloned()
338            .collect();
339
340        Ok(addresses)
341    }
342}
343
344impl Behaviour {
345    fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
346        if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
347            return Some(Event::RegisterFailed {
348                rendezvous_node,
349                namespace,
350                error: ErrorCode::Unavailable,
351            });
352        };
353
354        if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
355            return Some(Event::DiscoverFailed {
356                rendezvous_node,
357                namespace,
358                error: ErrorCode::Unavailable,
359            });
360        };
361
362        None
363    }
364
365    fn handle_response(
366        &mut self,
367        request_id: &OutboundRequestId,
368        response: Message,
369    ) -> Option<Event> {
370        match response {
371            RegisterResponse(Ok(ttl)) => {
372                if let Some((rendezvous_node, namespace)) =
373                    self.waiting_for_register.remove(request_id)
374                {
375                    self.registered_namespaces
376                        .insert((rendezvous_node, namespace.clone()), ttl);
377
378                    return Some(Event::Registered {
379                        rendezvous_node,
380                        ttl,
381                        namespace,
382                    });
383                }
384
385                None
386            }
387            RegisterResponse(Err(error_code)) => {
388                if let Some((rendezvous_node, namespace)) =
389                    self.waiting_for_register.remove(request_id)
390                {
391                    return Some(Event::RegisterFailed {
392                        rendezvous_node,
393                        namespace,
394                        error: error_code,
395                    });
396                }
397
398                None
399            }
400            DiscoverResponse(Ok((registrations, cookie))) => {
401                if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
402                {
403                    self.discovered_peers
404                        .extend(registrations.iter().map(|registration| {
405                            let peer_id = registration.record.peer_id();
406                            let namespace = registration.namespace.clone();
407
408                            let addresses = registration.record.addresses().to_vec();
409
410                            ((peer_id, namespace), addresses)
411                        }));
412
413                    self.expiring_registrations
414                        .extend(registrations.iter().cloned().map(|registration| {
415                            async move {
416                                // if the timer errors we consider it expired
417                                futures_timer::Delay::new(Duration::from_secs(registration.ttl))
418                                    .await;
419
420                                (registration.record.peer_id(), registration.namespace)
421                            }
422                            .boxed()
423                        }));
424
425                    return Some(Event::Discovered {
426                        rendezvous_node,
427                        registrations,
428                        cookie,
429                    });
430                }
431
432                None
433            }
434            DiscoverResponse(Err(error_code)) => {
435                if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
436                    return Some(Event::DiscoverFailed {
437                        rendezvous_node,
438                        namespace: ns,
439                        error: error_code,
440                    });
441                }
442
443                None
444            }
445            _ => unreachable!("rendezvous clients never receive requests"),
446        }
447    }
448}