1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22
23use std::{
24 borrow::Borrow,
25 collections::{HashMap, VecDeque},
26 error::Error,
27 hash::{Hash, Hasher},
28 net::{self, IpAddr, SocketAddr, SocketAddrV4},
29 ops::{Deref, DerefMut},
30 pin::Pin,
31 task::{Context, Poll},
32 time::Duration,
33};
34
35use futures::{channel::oneshot, Future, StreamExt};
36use futures_timer::Delay;
37use igd_next::PortMappingProtocol;
38use libp2p_core::{
39 multiaddr,
40 transport::{ListenerId, PortUse},
41 Endpoint, Multiaddr,
42};
43use libp2p_swarm::{
44 derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm,
45 NetworkBehaviour, NewListenAddr, ToSwarm,
46};
47
48use crate::tokio::{is_addr_global, Gateway};
49
50const MAPPING_DURATION: u32 = 3600;
52
53const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
55
56#[derive(Debug)]
58pub(crate) enum GatewayRequest {
59 AddMapping { mapping: Mapping, duration: u32 },
60 RemoveMapping(Mapping),
61}
62
63#[derive(Debug)]
65pub(crate) enum GatewayEvent {
66 Mapped(Mapping),
68 MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
70 Removed(Mapping),
72 RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
74}
75
76#[derive(Debug, Clone)]
78pub(crate) struct Mapping {
79 pub(crate) listener_id: ListenerId,
80 pub(crate) protocol: PortMappingProtocol,
81 pub(crate) multiaddr: Multiaddr,
82 pub(crate) internal_addr: SocketAddr,
83}
84
85impl Mapping {
86 fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr {
89 let addr = match gateway_addr {
90 net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip),
91 net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip),
92 };
93 self.multiaddr
94 .replace(0, |_| Some(addr))
95 .expect("multiaddr should be valid")
96 }
97}
98
99impl Hash for Mapping {
100 fn hash<H: Hasher>(&self, state: &mut H) {
101 self.listener_id.hash(state);
102 }
103}
104
105impl PartialEq for Mapping {
106 fn eq(&self, other: &Self) -> bool {
107 self.listener_id == other.listener_id
108 }
109}
110
111impl Eq for Mapping {}
112
113impl Borrow<ListenerId> for Mapping {
114 fn borrow(&self) -> &ListenerId {
115 &self.listener_id
116 }
117}
118
119#[derive(Debug)]
121enum MappingState {
122 Inactive,
124 Pending,
126 Active(Delay),
128 Failed,
130}
131
132enum GatewayState {
134 Searching(oneshot::Receiver<Result<Gateway, Box<dyn std::error::Error + Send + Sync>>>),
135 Available(Gateway),
136 GatewayNotFound,
137 NonRoutableGateway(IpAddr),
138}
139
140#[derive(Debug)]
142pub enum Event {
143 NewExternalAddr(Multiaddr),
145 ExpiredExternalAddr(Multiaddr),
147 GatewayNotFound,
149 NonRoutableGateway,
151}
152
153#[derive(Debug, Default)]
155struct MappingList(HashMap<Mapping, MappingState>);
156
157impl Deref for MappingList {
158 type Target = HashMap<Mapping, MappingState>;
159
160 fn deref(&self) -> &Self::Target {
161 &self.0
162 }
163}
164
165impl DerefMut for MappingList {
166 fn deref_mut(&mut self) -> &mut Self::Target {
167 &mut self.0
168 }
169}
170
171impl MappingList {
172 fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175 for (mapping, state) in self.iter_mut() {
176 match state {
177 MappingState::Inactive | MappingState::Failed => {
178 let duration = MAPPING_DURATION;
179 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180 mapping: mapping.clone(),
181 duration,
182 }) {
183 tracing::debug!(
184 multiaddress=%mapping.multiaddr,
185 "could not request port mapping for multiaddress on the gateway: {}",
186 err
187 );
188 }
189 *state = MappingState::Pending;
190 }
191 MappingState::Active(timeout) => {
192 if Pin::new(timeout).poll(cx).is_ready() {
193 let duration = MAPPING_DURATION;
194 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
195 mapping: mapping.clone(),
196 duration,
197 }) {
198 tracing::debug!(
199 multiaddress=%mapping.multiaddr,
200 "could not request port mapping for multiaddress on the gateway: {}",
201 err
202 );
203 }
204 }
205 }
206 MappingState::Pending => {}
207 }
208 }
209 }
210}
211
212pub struct Behaviour {
215 state: GatewayState,
217
218 mappings: MappingList,
220
221 pending_events: VecDeque<Event>,
223}
224
225impl Default for Behaviour {
226 fn default() -> Self {
227 Self {
228 state: GatewayState::Searching(crate::tokio::search_gateway()),
229 mappings: Default::default(),
230 pending_events: VecDeque::new(),
231 }
232 }
233}
234
235impl NetworkBehaviour for Behaviour {
236 type ConnectionHandler = dummy::ConnectionHandler;
237
238 type ToSwarm = Event;
239
240 fn handle_established_inbound_connection(
241 &mut self,
242 _connection_id: ConnectionId,
243 _peer: PeerId,
244 _local_addr: &Multiaddr,
245 _remote_addr: &Multiaddr,
246 ) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
247 Ok(dummy::ConnectionHandler)
248 }
249
250 fn handle_established_outbound_connection(
251 &mut self,
252 _connection_id: ConnectionId,
253 _peer: PeerId,
254 _addr: &Multiaddr,
255 _role_override: Endpoint,
256 _port_use: PortUse,
257 ) -> Result<libp2p_swarm::THandler<Self>, libp2p_swarm::ConnectionDenied> {
258 Ok(dummy::ConnectionHandler)
259 }
260
261 fn on_swarm_event(&mut self, event: FromSwarm) {
262 match event {
263 FromSwarm::NewListenAddr(NewListenAddr {
264 listener_id,
265 addr: multiaddr,
266 }) => {
267 let (addr, protocol) = match multiaddr_to_socketaddr_protocol(multiaddr.clone()) {
268 Ok(addr_port) => addr_port,
269 Err(()) => {
270 tracing::debug!("multiaddress not supported for UPnP {multiaddr}");
271 return;
272 }
273 };
274
275 if let Some((mapping, _state)) = self
276 .mappings
277 .iter()
278 .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
279 {
280 tracing::debug!(
281 multiaddress=%multiaddr,
282 mapped_multiaddress=%mapping.multiaddr,
283 "port from multiaddress is already being mapped"
284 );
285 return;
286 }
287
288 match &mut self.state {
289 GatewayState::Searching(_) => {
290 self.mappings.insert(
294 Mapping {
295 listener_id,
296 protocol,
297 internal_addr: addr,
298 multiaddr: multiaddr.clone(),
299 },
300 MappingState::Inactive,
301 );
302 }
303 GatewayState::Available(ref mut gateway) => {
304 let mapping = Mapping {
305 listener_id,
306 protocol,
307 internal_addr: addr,
308 multiaddr: multiaddr.clone(),
309 };
310
311 let duration = MAPPING_DURATION;
312 if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
313 mapping: mapping.clone(),
314 duration,
315 }) {
316 tracing::debug!(
317 multiaddress=%mapping.multiaddr,
318 "could not request port mapping for multiaddress on the gateway: {}",
319 err
320 );
321 }
322
323 self.mappings.insert(mapping, MappingState::Pending);
324 }
325 GatewayState::GatewayNotFound => {
326 tracing::debug!(
327 multiaddres=%multiaddr,
328 "network gateway not found, UPnP port mapping of multiaddres discarded"
329 );
330 }
331 GatewayState::NonRoutableGateway(addr) => {
332 tracing::debug!(
333 multiaddress=%multiaddr,
334 network_gateway_ip=%addr,
335 "the network gateway is not exposed to the public network. /
336 UPnP port mapping of multiaddress discarded"
337 );
338 }
339 };
340 }
341 FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
342 listener_id,
343 addr: _addr,
344 }) => {
345 if let GatewayState::Available(ref mut gateway) = &mut self.state {
346 if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) {
347 if let Err(err) = gateway
348 .sender
349 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
350 {
351 tracing::debug!(
352 multiaddress=%mapping.multiaddr,
353 "could not request port removal for multiaddress on the gateway: {}",
354 err
355 );
356 }
357 self.mappings.insert(mapping, MappingState::Pending);
358 }
359 }
360 }
361 _ => {}
362 }
363 }
364
365 fn on_connection_handler_event(
366 &mut self,
367 _peer_id: PeerId,
368 _connection_id: ConnectionId,
369 event: libp2p_swarm::THandlerOutEvent<Self>,
370 ) {
371 libp2p_core::util::unreachable(event)
372 }
373
374 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
375 fn poll(
376 &mut self,
377 cx: &mut Context<'_>,
378 ) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
379 if let Some(event) = self.pending_events.pop_front() {
381 return Poll::Ready(ToSwarm::GenerateEvent(event));
382 }
383
384 loop {
387 match self.state {
388 GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
389 Poll::Ready(result) => {
390 match result.expect("sender shouldn't have been dropped") {
391 Ok(gateway) => {
392 if !is_addr_global(gateway.external_addr) {
393 self.state =
394 GatewayState::NonRoutableGateway(gateway.external_addr);
395 tracing::debug!(
396 gateway_address=%gateway.external_addr,
397 "the gateway is not routable"
398 );
399 return Poll::Ready(ToSwarm::GenerateEvent(
400 Event::NonRoutableGateway,
401 ));
402 }
403 self.state = GatewayState::Available(gateway);
404 }
405 Err(err) => {
406 tracing::debug!("could not find gateway: {err}");
407 self.state = GatewayState::GatewayNotFound;
408 return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound));
409 }
410 }
411 }
412 Poll::Pending => return Poll::Pending,
413 },
414 GatewayState::Available(ref mut gateway) => {
415 if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
417 match result {
418 GatewayEvent::Mapped(mapping) => {
419 let new_state = MappingState::Active(Delay::new(
420 Duration::from_secs(MAPPING_TIMEOUT),
421 ));
422
423 match self
424 .mappings
425 .insert(mapping.clone(), new_state)
426 .expect("mapping should exist")
427 {
428 MappingState::Pending => {
429 let external_multiaddr =
430 mapping.external_addr(gateway.external_addr);
431 self.pending_events.push_back(Event::NewExternalAddr(
432 external_multiaddr.clone(),
433 ));
434 tracing::debug!(
435 address=%mapping.internal_addr,
436 protocol=%mapping.protocol,
437 "successfully mapped UPnP for protocol"
438 );
439 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
440 external_multiaddr,
441 ));
442 }
443 MappingState::Active(_) => {
444 tracing::debug!(
445 address=%mapping.internal_addr,
446 protocol=%mapping.protocol,
447 "successfully renewed UPnP mapping for protocol"
448 );
449 }
450 _ => unreachable!(),
451 }
452 }
453 GatewayEvent::MapFailure(mapping, err) => {
454 match self
455 .mappings
456 .insert(mapping.clone(), MappingState::Failed)
457 .expect("mapping should exist")
458 {
459 MappingState::Active(_) => {
460 tracing::debug!(
461 address=%mapping.internal_addr,
462 protocol=%mapping.protocol,
463 "failed to remap UPnP mapped for protocol: {err}"
464 );
465 let external_multiaddr =
466 mapping.external_addr(gateway.external_addr);
467 self.pending_events.push_back(Event::ExpiredExternalAddr(
468 external_multiaddr.clone(),
469 ));
470 return Poll::Ready(ToSwarm::ExternalAddrExpired(
471 external_multiaddr,
472 ));
473 }
474 MappingState::Pending => {
475 tracing::debug!(
476 address=%mapping.internal_addr,
477 protocol=%mapping.protocol,
478 "failed to map UPnP mapped for protocol: {err}"
479 );
480 }
481 _ => {
482 unreachable!()
483 }
484 }
485 }
486 GatewayEvent::Removed(mapping) => {
487 tracing::debug!(
488 address=%mapping.internal_addr,
489 protocol=%mapping.protocol,
490 "successfully removed UPnP mapping for protocol"
491 );
492 self.mappings
493 .remove(&mapping)
494 .expect("mapping should exist");
495 }
496 GatewayEvent::RemovalFailure(mapping, err) => {
497 tracing::debug!(
498 address=%mapping.internal_addr,
499 protocol=%mapping.protocol,
500 "could not remove UPnP mapping for protocol: {err}"
501 );
502 if let Err(err) = gateway
503 .sender
504 .try_send(GatewayRequest::RemoveMapping(mapping.clone()))
505 {
506 tracing::debug!(
507 multiaddress=%mapping.multiaddr,
508 "could not request port removal for multiaddress on the gateway: {}",
509 err
510 );
511 }
512 }
513 }
514 }
515
516 self.mappings.renew(gateway, cx);
518 return Poll::Pending;
519 }
520 _ => return Poll::Pending,
521 }
522 }
523 }
524}
525
526fn multiaddr_to_socketaddr_protocol(
531 addr: Multiaddr,
532) -> Result<(SocketAddr, PortMappingProtocol), ()> {
533 let mut iter = addr.into_iter();
534 match iter.next() {
535 Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() {
537 Some(multiaddr::Protocol::Tcp(port)) => {
538 return Ok((
539 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
540 PortMappingProtocol::TCP,
541 ));
542 }
543 Some(multiaddr::Protocol::Udp(port)) => {
544 return Ok((
545 SocketAddr::V4(SocketAddrV4::new(ipv4, port)),
546 PortMappingProtocol::UDP,
547 ));
548 }
549 _ => {}
550 },
551 _ => {}
552 }
553 Err(())
554}