libp2p_autonat/v1/behaviour/
as_client.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    collections::{HashMap, HashSet, VecDeque},
23    task::{Context, Poll},
24    time::Duration,
25};
26
27use futures::FutureExt;
28use futures_timer::Delay;
29use libp2p_core::Multiaddr;
30use libp2p_identity::PeerId;
31use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId};
32use libp2p_swarm::{ConnectionId, ListenAddresses, ToSwarm};
33use rand::{seq::SliceRandom, thread_rng};
34use web_time::Instant;
35
36use super::{
37    Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus,
38    ProbeId,
39};
40use crate::ResponseError;
41
42/// Outbound probe failed or was aborted.
43#[derive(Debug)]
44pub enum OutboundProbeError {
45    /// Probe was aborted because no server is known, or all servers
46    /// are throttled through [`Config::throttle_server_period`].
47    NoServer,
48    ///  Probe was aborted because the local peer has no listening or
49    /// external addresses.
50    NoAddresses,
51    /// Sending the dial-back request or receiving a response failed.
52    OutboundRequest(OutboundFailure),
53    /// The server refused or failed to dial us.
54    Response(ResponseError),
55}
56
57#[derive(Debug)]
58pub enum OutboundProbeEvent {
59    /// A dial-back request was sent to a remote peer.
60    Request {
61        probe_id: ProbeId,
62        /// Peer to which the request is sent.
63        peer: PeerId,
64    },
65    /// The remote successfully dialed one of our addresses.
66    Response {
67        probe_id: ProbeId,
68        /// Id of the peer that sent the response.
69        peer: PeerId,
70        /// The address at which the remote succeeded to dial us.
71        address: Multiaddr,
72    },
73    /// The outbound request failed, was rejected, or the remote could dial
74    /// none of our addresses.
75    Error {
76        probe_id: ProbeId,
77        /// Id of the peer used for the probe.
78        /// `None` if the probe was aborted due to no addresses or no qualified server.
79        peer: Option<PeerId>,
80        error: OutboundProbeError,
81    },
82}
83
84/// View over [`super::Behaviour`] in a client role.
85pub(crate) struct AsClient<'a> {
86    pub(crate) inner: &'a mut request_response::Behaviour<AutoNatCodec>,
87    pub(crate) local_peer_id: PeerId,
88    pub(crate) config: &'a Config,
89    pub(crate) connected: &'a HashMap<PeerId, HashMap<ConnectionId, Option<Multiaddr>>>,
90    pub(crate) probe_id: &'a mut ProbeId,
91    pub(crate) servers: &'a HashSet<PeerId>,
92    pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>,
93    pub(crate) nat_status: &'a mut NatStatus,
94    pub(crate) confidence: &'a mut usize,
95    pub(crate) ongoing_outbound: &'a mut HashMap<OutboundRequestId, ProbeId>,
96    pub(crate) last_probe: &'a mut Option<Instant>,
97    pub(crate) schedule_probe: &'a mut Delay,
98    pub(crate) listen_addresses: &'a ListenAddresses,
99    pub(crate) other_candidates: &'a HashSet<Multiaddr>,
100}
101
102impl HandleInnerEvent for AsClient<'_> {
103    fn handle_event(
104        &mut self,
105        event: request_response::Event<DialRequest, DialResponse>,
106    ) -> VecDeque<Action> {
107        match event {
108            request_response::Event::Message {
109                peer,
110                message:
111                    request_response::Message::Response {
112                        request_id,
113                        response,
114                    },
115                ..
116            } => {
117                tracing::debug!(?response, "Outbound dial-back request returned response");
118
119                let probe_id = self
120                    .ongoing_outbound
121                    .remove(&request_id)
122                    .expect("OutboundRequestId exists.");
123
124                let event = match response.result.clone() {
125                    Ok(address) => OutboundProbeEvent::Response {
126                        probe_id,
127                        peer,
128                        address,
129                    },
130                    Err(e) => OutboundProbeEvent::Error {
131                        probe_id,
132                        peer: Some(peer),
133                        error: OutboundProbeError::Response(e),
134                    },
135                };
136
137                let mut actions = VecDeque::with_capacity(3);
138
139                actions.push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event)));
140
141                if let Some(old) = self.handle_reported_status(response.result.clone().into()) {
142                    actions.push_back(ToSwarm::GenerateEvent(Event::StatusChanged {
143                        old,
144                        new: self.nat_status.clone(),
145                    }));
146                }
147
148                if let Ok(address) = response.result {
149                    actions.push_back(ToSwarm::ExternalAddrConfirmed(address));
150                }
151
152                actions
153            }
154            request_response::Event::OutboundFailure {
155                peer,
156                error,
157                request_id,
158                ..
159            } => {
160                tracing::debug!(
161                    %peer,
162                    "Outbound Failure {} when on dial-back request to peer.",
163                    error,
164                );
165                let probe_id = self
166                    .ongoing_outbound
167                    .remove(&request_id)
168                    .unwrap_or_else(|| self.probe_id.next());
169
170                self.schedule_probe.reset(Duration::ZERO);
171
172                VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe(
173                    OutboundProbeEvent::Error {
174                        probe_id,
175                        peer: Some(peer),
176                        error: OutboundProbeError::OutboundRequest(error),
177                    },
178                ))])
179            }
180            _ => VecDeque::default(),
181        }
182    }
183}
184
185impl AsClient<'_> {
186    pub(crate) fn poll_auto_probe(&mut self, cx: &mut Context<'_>) -> Poll<OutboundProbeEvent> {
187        match self.schedule_probe.poll_unpin(cx) {
188            Poll::Ready(()) => {
189                self.schedule_probe.reset(self.config.retry_interval);
190
191                let addresses = self
192                    .other_candidates
193                    .iter()
194                    .chain(self.listen_addresses.iter())
195                    .cloned()
196                    .collect();
197
198                let probe_id = self.probe_id.next();
199                let event = match self.do_probe(probe_id, addresses) {
200                    Ok(peer) => OutboundProbeEvent::Request { probe_id, peer },
201                    Err(error) => {
202                        self.handle_reported_status(NatStatus::Unknown);
203                        OutboundProbeEvent::Error {
204                            probe_id,
205                            peer: None,
206                            error,
207                        }
208                    }
209                };
210                Poll::Ready(event)
211            }
212            Poll::Pending => Poll::Pending,
213        }
214    }
215
216    // An inbound connection can indicate that we are public; adjust the delay to the next probe.
217    pub(crate) fn on_inbound_connection(&mut self) {
218        if *self.confidence == self.config.confidence_max {
219            if self.nat_status.is_public() {
220                self.schedule_next_probe(self.config.refresh_interval * 2);
221            } else {
222                self.schedule_next_probe(self.config.refresh_interval / 5);
223            }
224        }
225    }
226
227    pub(crate) fn on_new_address(&mut self) {
228        if !self.nat_status.is_public() {
229            // New address could be publicly reachable, trigger retry.
230            if *self.confidence > 0 {
231                *self.confidence -= 1;
232            }
233            self.schedule_next_probe(self.config.retry_interval);
234        }
235    }
236
237    pub(crate) fn on_expired_address(&mut self, addr: &Multiaddr) {
238        if let NatStatus::Public(public_address) = self.nat_status {
239            if public_address == addr {
240                *self.confidence = 0;
241                *self.nat_status = NatStatus::Unknown;
242                self.schedule_next_probe(Duration::ZERO);
243            }
244        }
245    }
246
247    // Select a random server for the probe.
248    fn random_server(&mut self) -> Option<PeerId> {
249        // Update list of throttled servers.
250        let i = self.throttled_servers.partition_point(|(_, time)| {
251            *time + self.config.throttle_server_period < Instant::now()
252        });
253        self.throttled_servers.drain(..i);
254
255        let mut servers: Vec<&PeerId> = self.servers.iter().collect();
256
257        if self.config.use_connected {
258            servers.extend(self.connected.iter().filter_map(|(id, addrs)| {
259                // Filter servers for which no qualified address is known.
260                // This is the case if the connection is relayed or the address is
261                // not global (in case of Config::only_global_ips).
262                addrs.values().any(|a| a.is_some()).then_some(id)
263            }));
264        }
265
266        servers.retain(|s| !self.throttled_servers.iter().any(|(id, _)| s == &id));
267
268        servers.choose(&mut thread_rng()).map(|&&p| p)
269    }
270
271    // Send a dial-request to a randomly selected server.
272    // Returns the server that is used in this probe.
273    // `Err` if there are no qualified servers or no addresses.
274    fn do_probe(
275        &mut self,
276        probe_id: ProbeId,
277        addresses: Vec<Multiaddr>,
278    ) -> Result<PeerId, OutboundProbeError> {
279        let _ = self.last_probe.insert(Instant::now());
280        if addresses.is_empty() {
281            tracing::debug!("Outbound dial-back request aborted: No dial-back addresses");
282            return Err(OutboundProbeError::NoAddresses);
283        }
284
285        let server = self.random_server().ok_or(OutboundProbeError::NoServer)?;
286
287        let request_id = self.inner.send_request(
288            &server,
289            DialRequest {
290                peer_id: self.local_peer_id,
291                addresses,
292            },
293        );
294        self.throttled_servers.push((server, Instant::now()));
295        tracing::debug!(peer=%server, "Send dial-back request to peer");
296        self.ongoing_outbound.insert(request_id, probe_id);
297        Ok(server)
298    }
299
300    // Set the delay to the next probe based on the time of our last probe
301    // and the specified delay.
302    fn schedule_next_probe(&mut self, delay: Duration) {
303        let Some(last_probe_instant) = self.last_probe else {
304            return;
305        };
306        let schedule_next = *last_probe_instant + delay;
307        self.schedule_probe
308            .reset(schedule_next.saturating_duration_since(Instant::now()));
309    }
310
311    // Adapt current confidence and NAT status to the status reported by the latest probe.
312    // Return the old status if it flipped.
313    fn handle_reported_status(&mut self, reported_status: NatStatus) -> Option<NatStatus> {
314        self.schedule_next_probe(self.config.retry_interval);
315
316        if matches!(reported_status, NatStatus::Unknown) {
317            return None;
318        }
319
320        if reported_status == *self.nat_status {
321            if *self.confidence < self.config.confidence_max {
322                *self.confidence += 1;
323            }
324            // Delay with (usually longer) refresh-interval.
325            if *self.confidence >= self.config.confidence_max {
326                self.schedule_next_probe(self.config.refresh_interval);
327            }
328            return None;
329        }
330
331        if reported_status.is_public() && self.nat_status.is_public() {
332            // Different address than the currently assumed public address was reported.
333            // Switch address, but don't report as flipped.
334            *self.nat_status = reported_status;
335            return None;
336        }
337        if *self.confidence > 0 {
338            // Reduce confidence but keep old status.
339            *self.confidence -= 1;
340            return None;
341        }
342
343        tracing::debug!(
344            old_status=?self.nat_status,
345            new_status=?reported_status,
346            "Flipped assumed NAT status"
347        );
348
349        let old_status = self.nat_status.clone();
350        *self.nat_status = reported_status;
351
352        Some(old_status)
353    }
354}
355
356impl From<Result<Multiaddr, ResponseError>> for NatStatus {
357    fn from(result: Result<Multiaddr, ResponseError>) -> Self {
358        match result {
359            Ok(addr) => NatStatus::Public(addr),
360            Err(ResponseError::DialError) => NatStatus::Private,
361            _ => NatStatus::Unknown,
362        }
363    }
364}