libp2p_upnp/
behaviour.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24    borrow::Borrow,
25    collections::{HashMap, VecDeque},
26    error::Error,
27    hash::{Hash, Hasher},
28    net::{self, IpAddr, SocketAddr, SocketAddrV4},
29    ops::{Deref, DerefMut},
30    pin::Pin,
31    task::{Context, Poll},
32    time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39    multiaddr,
40    transport::{ListenerId, PortUse},
41    Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44    derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45    NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50/// The duration in seconds of a port mapping on the gateway.
51const MAPPING_DURATION: u32 = 3600;
52
53/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
54const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56/// A [`Gateway`] Request.
57#[derive(Debug)]
58pub(crate) enum GatewayRequest {
59    AddMapping { mapping: Mapping, duration: u32 },
60    RemoveMapping(Mapping),
61}
62
63/// A [`Gateway`] event.
64#[derive(Debug)]
65pub(crate) enum GatewayEvent {
66    /// Port was successfully mapped.
67    Mapped(Mapping),
68    /// There was a failure mapping port.
69    MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
70    /// Port was successfully removed.
71    Removed(Mapping),
72    /// There was a failure removing the mapped port.
73    RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
74}
75
76/// Mapping of a Protocol and Port on the gateway.
77#[derive(Debug, Clone)]
78pub(crate) struct Mapping {
79    pub(crate) listener_id: ListenerId,
80    pub(crate) protocol: PortMappingProtocol,
81    pub(crate) multiaddr: Multiaddr,
82    pub(crate) internal_addr: SocketAddr,
83}
84
85impl Mapping {
86    /// Given the input gateway address, calculate the
87    /// open external `Multiaddr`.
88    fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
89        let addr = match gateway_addr {
90            net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
91            net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
92        };
93        self.multiaddr
94            .replace(0, |_| Some(addr))
95            .expect("multiaddr should be valid")
96    }
97}
98
99impl Hash for Mapping {
100    fn hash<H: Hasher>(&self, state: &mut H) {
101        self.listener_id.hash(state);
102    }
103}
104
105impl PartialEq for Mapping {
106    fn eq(&self, other: &Self) -> bool {
107        self.listener_id == other.listener_id
108    }
109}
110
111impl Eq for Mapping {}
112
113impl Borrow<ListenerId> for Mapping {
114    fn borrow(&self) -> &ListenerId {
115        &self.listener_id
116    }
117}
118
119/// Current state of a [`Mapping`].
120#[derive(Debug)]
121enum MappingState {
122    /// Port mapping is inactive, will be requested or re-requested on the next iteration.
123    Inactive,
124    /// Port mapping/removal has been requested on the gateway.
125    Pending,
126    /// Port mapping is active with the inner timeout.
127    Active(Delay),
128    /// Port mapping failed, we will try again.
129    Failed,
130}
131
132/// Current state of the UPnP [`Gateway`].
133enum GatewayState {
134    Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
135    Available(Gateway),
136    GatewayNotFound,
137    NonRoutableGateway(IpAddr),
138}
139
140/// The event produced by `Behaviour`.
141#[derive(Debug)]
142pub enum Event {
143    /// The multiaddress is reachable externally.
144    NewExternalAddr(Multiaddr),
145    /// The renewal of the multiaddress on the gateway failed.
146    ExpiredExternalAddr(Multiaddr),
147    /// The IGD gateway was not found.
148    GatewayNotFound,
149    /// The Gateway is not exposed directly to the public network.
150    NonRoutableGateway,
151}
152
153/// A list of port mappings and its state.
154#[derive(Debug, Default)]
155struct MappingList(HashMap<Mapping, MappingState>);
156
157impl Deref for MappingList {
158    type Target = HashMap<Mapping, MappingState>;
159
160    fn deref(&self) -> &Self::Target {
161        &self.0
162    }
163}
164
165impl DerefMut for MappingList {
166    fn deref_mut(&mut self) -> &mut Self::Target {
167        &mut self.0
168    }
169}
170
171impl MappingList {
172    /// Queue for renewal the current mapped ports on the `Gateway` that are expiring,
173    /// and try to activate the inactive.
174    fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175        for (mapping, state) in self.iter_mut() {
176            match state {
177                MappingState::Inactive | MappingState::Failed => {
178                    let duration = MAPPING_DURATION;
179                    if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180                        mapping: mapping.clone(),
181                        duration,
182                    }) {
183                        tracing::debug!(
184                            multiaddress=%mapping.multiaddr,
185                            "could not request port mapping for multiaddress on the gateway: {}",
186                            err
187                        );
188                    }
189                    *state = MappingState::Pending;
190                }
191                MappingState::Active(timeout) => {
192                    if Pin::new(timeout).poll(cx).is_ready() {
193                        let duration = MAPPING_DURATION;
194                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
195                            mapping: mapping.clone(),
196                            duration,
197                        }) {
198                            tracing::debug!(
199                                multiaddress=%mapping.multiaddr,
200                                "could not request port mapping for multiaddress on the gateway: {}",
201                                err
202                            );
203                        }
204                    }
205                }
206                MappingState::Pending => {}
207            }
208        }
209    }
210}
211
212/// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port
213/// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`].
214pub struct Behaviour {
215    /// UPnP interface state.
216    state: GatewayState,
217
218    /// List of port mappings.
219    mappings: MappingList,
220
221    /// Pending behaviour events to be emitted.
222    pending_events: VecDeque<Event>,
223}
224
225impl Default for Behaviour {
226    fn default() -> Self {
227        Self {
228            state: GatewayState::Searching(crate::tokio::search_gateway()),
229            mappings: Default::default(),
230            pending_events: VecDeque::new(),
231        }
232    }
233}
234
235impl NetworkBehaviour for Behaviour {
236    type ConnectionHandler = dummy::ConnectionHandler;
237
238    type ToSwarm = Event;
239
240    fn handle_established_inbound_connection(
241        &mut self,
242        _connection_id: ConnectionId,
243        _peer: PeerId,
244        _local_addr: &Multiaddr,
245        _remote_addr: &Multiaddr,
246    ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
247        Ok(dummy::ConnectionHandler)
248    }
249
250    fn handle_established_outbound_connection(
251        &mut self,
252        _connection_id: ConnectionId,
253        _peer: PeerId,
254        _addr: &Multiaddr,
255        _role_override: Endpoint,
256        _port_use: PortUse,
257    ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
258        Ok(dummy::ConnectionHandler)
259    }
260
261    fn on_swarm_event(&mut self, event: FromSwarm) {
262        match event {
263            FromSwarm::NewListenAddr(NewListenAddr {
264                listener_id,
265                addr: multiaddr,
266            }) => {
267                let (addr, protocol) = match multiaddr_to_socketaddr_protocol(multiaddr.clone()) {
268                    Ok(addr_port) => addr_port,
269                    Err(()) => {
270                        tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
271                        return;
272                    }
273                };
274
275                if let Some((mapping, _state)) = self
276                    .mappings
277                    .iter()
278                    .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
279                {
280                    tracing::debug!(
281                        multiaddress=%multiaddr,
282                        mapped_multiaddress=%mapping.multiaddr,
283                        "port from multiaddress is already being mapped"
284                    );
285                    return;
286                }
287
288                match &mut self.state {
289                    GatewayState::Searching(_) => {
290                        // As the gateway is not yet available we add the mapping with
291                        // `MappingState::Inactive` so that when and if it
292                        // becomes available we map it.
293                        self.mappings.insert(
294                            Mapping {
295                                listener_id,
296                                protocol,
297                                internal_addr: addr,
298                                multiaddr: multiaddr.clone(),
299                            },
300                            MappingState::Inactive,
301                        );
302                    }
303                    GatewayState::Available(ref mut gateway) => {
304                        let mapping = Mapping {
305                            listener_id,
306                            protocol,
307                            internal_addr: addr,
308                            multiaddr: multiaddr.clone(),
309                        };
310
311                        let duration = MAPPING_DURATION;
312                        if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
313                            mapping: mapping.clone(),
314                            duration,
315                        }) {
316                            tracing::debug!(
317                                multiaddress=%mapping.multiaddr,
318                                "could not request port mapping for multiaddress on the gateway: {}",
319                                err
320                            );
321                        }
322
323                        self.mappings.insert(mapping, MappingState::Pending);
324                    }
325                    GatewayState::GatewayNotFound => {
326                        tracing::debug!(
327                            multiaddres=%multiaddr,
328                            "network gateway not found, UPnP port mapping of multiaddres discarded"
329                        );
330                    }
331                    GatewayState::NonRoutableGateway(addr) => {
332                        tracing::debug!(
333                            multiaddress=%multiaddr,
334                            network_gateway_ip=%addr,
335                            "the network gateway is not exposed to the public network. /
336                             UPnP port mapping of multiaddress discarded"
337                        );
338                    }
339                };
340            }
341            FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
342                listener_id,
343                addr: _addr,
344            }) => {
345                if let GatewayState::Available(ref mut gateway) = &mut self.state {
346                    if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
347                        if let Err(err) = gateway
348                            .sender
349                            .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
350                        {
351                            tracing::debug!(
352                                multiaddress=%mapping.multiaddr,
353                                "could not request port removal for multiaddress on the gateway: {}",
354                                err
355                            );
356                        }
357                        self.mappings.insert(mapping, MappingState::Pending);
358                    }
359                }
360            }
361            _ => {}
362        }
363    }
364
365    fn on_connection_handler_event(
366        &mut self,
367        _peer_id: PeerId,
368        _connection_id: ConnectionId,
369        event: libp2p_swarm::THandlerOutEvent<Self>,
370    ) {
371        libp2p_core::util::unreachable(event)
372    }
373
374    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
375    fn poll(
376        &mut self,
377        cx: &mut Context<'_>,
378    ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
379        // If there are pending addresses to be emitted we emit them.
380        if let Some(event) = self.pending_events.pop_front() {
381            return Poll::Ready(ToSwarm::GenerateEvent(event));
382        }
383
384        // Loop through the gateway state so that if it changes from `Searching` to `Available`
385        // we poll the pending mapping requests.
386        loop {
387            match self.state {
388                GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
389                    Poll::Ready(result) => {
390                        match result.expect("sender shouldn't have been dropped") {
391                            Ok(gateway) => {
392                                if !is_addr_global(gateway.external_addr) {
393                                    self.state =
394                                        GatewayState::NonRoutableGateway(gateway.external_addr);
395                                    tracing::debug!(
396                                        gateway_address=%gateway.external_addr,
397                                        "the gateway is not routable"
398                                    );
399                                    return Poll::Ready(ToSwarm::GenerateEvent(
400                                        Event::NonRoutableGateway,
401                                    ));
402                                }
403                                self.state = GatewayState::Available(gateway);
404                            }
405                            Err(err) => {
406                                tracing::debug!("could not find gateway: {err}");
407                                self.state = GatewayState::GatewayNotFound;
408                                return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
409                            }
410                        }
411                    }
412                    Poll::Pending => return Poll::Pending,
413                },
414                GatewayState::Available(ref mut gateway) => {
415                    // Poll pending mapping requests.
416                    if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
417                        match result {
418                            GatewayEvent::Mapped(mapping) => {
419                                let new_state = MappingState::Active(Delay::new(
420                                    Duration::from_secs(MAPPING_TIMEOUT),
421                                ));
422
423                                match self
424                                    .mappings
425                                    .insert(mapping.clone(), new_state)
426                                    .expect("mapping should exist")
427                                {
428                                    MappingState::Pending => {
429                                        let external_multiaddr =
430                                            mapping.external_addr(gateway.external_addr);
431                                        self.pending_events.push_back(Event::NewExternalAddr(
432                                            external_multiaddr.clone(),
433                                        ));
434                                        tracing::debug!(
435                                            address=%mapping.internal_addr,
436                                            protocol=%mapping.protocol,
437                                            "successfully mapped UPnP for protocol"
438                                        );
439                                        return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
440                                            external_multiaddr,
441                                        ));
442                                    }
443                                    MappingState::Active(_) => {
444                                        tracing::debug!(
445                                            address=%mapping.internal_addr,
446                                            protocol=%mapping.protocol,
447                                            "successfully renewed UPnP mapping for protocol"
448                                        );
449                                    }
450                                    _ => unreachable!(),
451                                }
452                            }
453                            GatewayEvent::MapFailure(mapping, err) => {
454                                match self
455                                    .mappings
456                                    .insert(mapping.clone(), MappingState::Failed)
457                                    .expect("mapping should exist")
458                                {
459                                    MappingState::Active(_) => {
460                                        tracing::debug!(
461                                            address=%mapping.internal_addr,
462                                            protocol=%mapping.protocol,
463                                            "failed to remap UPnP mapped for protocol: {err}"
464                                        );
465                                        let external_multiaddr =
466                                            mapping.external_addr(gateway.external_addr);
467                                        self.pending_events.push_back(Event::ExpiredExternalAddr(
468                                            external_multiaddr.clone(),
469                                        ));
470                                        return Poll::Ready(ToSwarm::ExternalAddrExpired(
471                                            external_multiaddr,
472                                        ));
473                                    }
474                                    MappingState::Pending => {
475                                        tracing::debug!(
476                                            address=%mapping.internal_addr,
477                                            protocol=%mapping.protocol,
478                                            "failed to map UPnP mapped for protocol: {err}"
479                                        );
480                                    }
481                                    _ => {
482                                        unreachable!()
483                                    }
484                                }
485                            }
486                            GatewayEvent::Removed(mapping) => {
487                                tracing::debug!(
488                                    address=%mapping.internal_addr,
489                                    protocol=%mapping.protocol,
490                                    "successfully removed UPnP mapping for protocol"
491                                );
492                                self.mappings
493                                    .remove(&mapping)
494                                    .expect("mapping should exist");
495                            }
496                            GatewayEvent::RemovalFailure(mapping, err) => {
497                                tracing::debug!(
498                                    address=%mapping.internal_addr,
499                                    protocol=%mapping.protocol,
500                                    "could not remove UPnP mapping for protocol: {err}"
501                                );
502                                if let Err(err) = gateway
503                                    .sender
504                                    .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
505                                {
506                                    tracing::debug!(
507                                        multiaddress=%mapping.multiaddr,
508                                        "could not request port removal for multiaddress on the gateway: {}",
509                                        err
510                                    );
511                                }
512                            }
513                        }
514                    }
515
516                    // Renew expired and request inactive mappings.
517                    self.mappings.renew(gateway, cx);
518                    return Poll::Pending;
519                }
520                _ => return Poll::Pending,
521            }
522        }
523    }
524}
525
526/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`].
527///
528/// Fails if the given [`Multiaddr`] does not begin with an IP
529/// protocol encapsulating a TCP or UDP port.
530fn multiaddr_to_socketaddr_protocol(
531    addr: Multiaddr,
532) -> Result<(SocketAddr, PortMappingProtocol), ()> {
533    let mut iter = addr.into_iter();
534    match iter.next() {
535        // Idg only supports Ipv4.
536        Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
537            Some(multiaddr::Protocol::Tcp(port)) => {
538                return Ok((
539                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
540                    PortMappingProtocol::TCP,
541                ));
542            }
543            Some(multiaddr::Protocol::Udp(port)) => {
544                return Ok((
545                    SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
546                    PortMappingProtocol::UDP,
547                ));
548            }
549            _ => {}
550        },
551        _ => {}
552    }
553    Err(())
554}