libp2p_autonat/v2/client/
behaviour.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    fmt::{Debug, Display, Formatter},
4    task::{Context, Poll},
5    time::Duration,
6};
7
8use either::Either;
9use futures::FutureExt;
10use futures_timer::Delay;
11use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
12use libp2p_identity::PeerId;
13use libp2p_swarm::{
14    behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionHandler,
15    ConnectionId, FromSwarm, NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm,
16};
17use rand::prelude::*;
18use rand_core::OsRng;
19
20use super::handler::{
21    dial_back::{self, IncomingNonce},
22    dial_request,
23};
24use crate::v2::{protocol::DialRequest, Nonce};
25
26#[derive(Debug, Clone, Copy)]
27pub struct Config {
28    /// How many candidates we will test at most.
29    pub(crate) max_candidates: usize,
30
31    /// The interval at which we will attempt to confirm candidates as external addresses.
32    pub(crate) probe_interval: Duration,
33}
34
35impl Config {
36    pub fn with_max_candidates(self, max_candidates: usize) -> Self {
37        Self {
38            max_candidates,
39            ..self
40        }
41    }
42
43    pub fn with_probe_interval(self, probe_interval: Duration) -> Self {
44        Self {
45            probe_interval,
46            ..self
47        }
48    }
49}
50
51impl Default for Config {
52    fn default() -> Self {
53        Self {
54            max_candidates: 10,
55            probe_interval: Duration::from_secs(5),
56        }
57    }
58}
59
60pub struct Behaviour<R = OsRng>
61where
62    R: RngCore + 'static,
63{
64    rng: R,
65    config: Config,
66    pending_events: VecDeque<
67        ToSwarm<
68            <Self as NetworkBehaviour>::ToSwarm,
69            <<Self as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::FromBehaviour,
70        >,
71    >,
72    address_candidates: HashMap<Multiaddr, AddressInfo>,
73    next_tick: Delay,
74    peer_info: HashMap<ConnectionId, ConnectionInfo>,
75}
76
77impl<R> NetworkBehaviour for Behaviour<R>
78where
79    R: RngCore + 'static,
80{
81    type ConnectionHandler = Either<dial_request::Handler, dial_back::Handler>;
82
83    type ToSwarm = Event;
84
85    fn handle_established_inbound_connection(
86        &mut self,
87        _: ConnectionId,
88        _: PeerId,
89        _: &Multiaddr,
90        _: &Multiaddr,
91    ) -> Result<<Self as NetworkBehaviour>::ConnectionHandler, ConnectionDenied> {
92        Ok(Either::Right(dial_back::Handler::new()))
93    }
94
95    fn handle_established_outbound_connection(
96        &mut self,
97        _: ConnectionId,
98        _: PeerId,
99        _: &Multiaddr,
100        _: Endpoint,
101        _: PortUse,
102    ) -> Result<<Self as NetworkBehaviour>::ConnectionHandler, ConnectionDenied> {
103        Ok(Either::Left(dial_request::Handler::new()))
104    }
105
106    fn on_swarm_event(&mut self, event: FromSwarm) {
107        match event {
108            FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
109                self.address_candidates
110                    .entry(addr.clone())
111                    .or_default()
112                    .score += 1;
113            }
114            FromSwarm::ConnectionEstablished(ConnectionEstablished {
115                peer_id,
116                connection_id,
117                endpoint: _,
118                ..
119            }) => {
120                self.peer_info.insert(
121                    connection_id,
122                    ConnectionInfo {
123                        peer_id,
124                        supports_autonat: false,
125                    },
126                );
127            }
128            FromSwarm::ConnectionClosed(ConnectionClosed {
129                peer_id,
130                connection_id,
131                ..
132            }) => {
133                let info = self
134                    .peer_info
135                    .remove(&connection_id)
136                    .expect("inconsistent state");
137
138                if info.supports_autonat {
139                    tracing::debug!(%peer_id, "Disconnected from AutoNAT server");
140                }
141            }
142            _ => {}
143        }
144    }
145
146    fn on_connection_handler_event(
147        &mut self,
148        peer_id: PeerId,
149        connection_id: ConnectionId,
150        event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
151    ) {
152        let (nonce, outcome) = match event {
153            Either::Right(IncomingNonce { nonce, sender }) => {
154                let Some((_, info)) = self
155                    .address_candidates
156                    .iter_mut()
157                    .find(|(_, info)| info.is_pending_with_nonce(nonce))
158                else {
159                    let _ = sender.send(Err(std::io::Error::new(
160                        std::io::ErrorKind::InvalidData,
161                        format!("Received unexpected nonce: {nonce} from {peer_id}"),
162                    )));
163                    return;
164                };
165
166                info.status = TestStatus::Received(nonce);
167                tracing::debug!(%peer_id, %nonce, "Successful dial-back");
168
169                let _ = sender.send(Ok(()));
170
171                return;
172            }
173            Either::Left(dial_request::ToBehaviour::PeerHasServerSupport) => {
174                self.peer_info
175                    .get_mut(&connection_id)
176                    .expect("inconsistent state")
177                    .supports_autonat = true;
178                return;
179            }
180            Either::Left(dial_request::ToBehaviour::TestOutcome { nonce, outcome }) => {
181                (nonce, outcome)
182            }
183        };
184
185        let ((tested_addr, bytes_sent), result) = match outcome {
186            Ok(address) => {
187                let received_dial_back = self
188                    .address_candidates
189                    .iter_mut()
190                    .any(|(_, info)| info.is_received_with_nonce(nonce));
191
192                if !received_dial_back {
193                    tracing::warn!(
194                        %peer_id,
195                        %nonce,
196                        "Server reported reachbility but we never received a dial-back"
197                    );
198                    return;
199                }
200
201                self.pending_events
202                    .push_back(ToSwarm::ExternalAddrConfirmed(address.0.clone()));
203
204                (address, Ok(()))
205            }
206            Err(dial_request::Error::UnsupportedProtocol) => {
207                self.peer_info
208                    .get_mut(&connection_id)
209                    .expect("inconsistent state")
210                    .supports_autonat = false;
211
212                self.reset_status_to(nonce, TestStatus::Untested); // Reset so it will be tried again.
213
214                return;
215            }
216            Err(dial_request::Error::Io(e)) => {
217                tracing::debug!(
218                    %peer_id,
219                    %nonce,
220                    "Failed to complete AutoNAT probe: {e}"
221                );
222
223                self.reset_status_to(nonce, TestStatus::Untested); // Reset so it will be tried again.
224
225                return;
226            }
227            Err(dial_request::Error::AddressNotReachable {
228                address,
229                bytes_sent,
230                error,
231            }) => {
232                self.reset_status_to(nonce, TestStatus::Failed);
233
234                ((address, bytes_sent), Err(error))
235            }
236        };
237
238        self.pending_events.push_back(ToSwarm::GenerateEvent(Event {
239            tested_addr,
240            bytes_sent,
241            server: peer_id,
242            result: result.map_err(|e| Error { inner: e }),
243        }));
244    }
245
246    fn poll(
247        &mut self,
248        cx: &mut Context<'_>,
249    ) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
250    {
251        loop {
252            if let Some(event) = self.pending_events.pop_front() {
253                return Poll::Ready(event);
254            }
255
256            if self.next_tick.poll_unpin(cx).is_ready() {
257                self.next_tick.reset(self.config.probe_interval);
258
259                self.issue_dial_requests_for_untested_candidates();
260                continue;
261            }
262
263            return Poll::Pending;
264        }
265    }
266}
267
268impl<R> Behaviour<R>
269where
270    R: RngCore + 'static,
271{
272    pub fn new(rng: R, config: Config) -> Self {
273        Self {
274            rng,
275            next_tick: Delay::new(config.probe_interval),
276            config,
277            pending_events: VecDeque::new(),
278            address_candidates: HashMap::new(),
279            peer_info: HashMap::new(),
280        }
281    }
282
283    /// Issues dial requests to random AutoNAT servers for the most frequently reported, untested
284    /// candidates.
285    ///
286    /// In the current implementation, we only send a single address to each AutoNAT server.
287    /// This spreads our candidates out across all servers we are connected to which should give us
288    /// pretty fast feedback on all of them.
289    fn issue_dial_requests_for_untested_candidates(&mut self) {
290        for addr in self.untested_candidates() {
291            let Some((conn_id, peer_id)) = self.random_autonat_server() else {
292                tracing::debug!("Not connected to any AutoNAT servers");
293                return;
294            };
295
296            let nonce = self.rng.gen();
297            self.address_candidates
298                .get_mut(&addr)
299                .expect("only emit candidates")
300                .status = TestStatus::Pending(nonce);
301
302            self.pending_events.push_back(ToSwarm::NotifyHandler {
303                peer_id,
304                handler: NotifyHandler::One(conn_id),
305                event: Either::Left(DialRequest {
306                    nonce,
307                    addrs: vec![addr],
308                }),
309            });
310        }
311    }
312
313    /// Returns all untested candidates, sorted by the frequency they were reported at.
314    ///
315    /// More frequently reported candidates are considered to more likely be external addresses and
316    /// thus tested first.
317    fn untested_candidates(&self) -> impl Iterator<Item = Multiaddr> {
318        let mut entries = self
319            .address_candidates
320            .iter()
321            .filter(|(_, info)| info.status == TestStatus::Untested)
322            .map(|(addr, count)| (addr.clone(), *count))
323            .collect::<Vec<_>>();
324
325        entries.sort_unstable_by_key(|(_, info)| info.score);
326
327        if entries.is_empty() {
328            tracing::debug!("No untested address candidates");
329        }
330
331        entries
332            .into_iter()
333            .rev() // `sort_unstable` is ascending
334            .take(self.config.max_candidates)
335            .map(|(addr, _)| addr)
336    }
337
338    /// Chooses an active connection to one of our peers that reported support for the
339    /// [`DIAL_REQUEST_PROTOCOL`](crate::v2::DIAL_REQUEST_PROTOCOL) protocol.
340    fn random_autonat_server(&mut self) -> Option<(ConnectionId, PeerId)> {
341        let (conn_id, info) = self
342            .peer_info
343            .iter()
344            .filter(|(_, info)| info.supports_autonat)
345            .choose(&mut self.rng)?;
346
347        Some((*conn_id, info.peer_id))
348    }
349
350    fn reset_status_to(&mut self, nonce: Nonce, new_status: TestStatus) {
351        let Some((_, info)) = self
352            .address_candidates
353            .iter_mut()
354            .find(|(_, i)| i.is_pending_with_nonce(nonce) || i.is_received_with_nonce(nonce))
355        else {
356            return;
357        };
358
359        info.status = new_status;
360    }
361
362    // FIXME: We don't want test-only APIs in our public API.
363    #[doc(hidden)]
364    pub fn validate_addr(&mut self, addr: &Multiaddr) {
365        if let Some(info) = self.address_candidates.get_mut(addr) {
366            info.status = TestStatus::Received(self.rng.next_u64());
367        }
368    }
369}
370
371impl Default for Behaviour<OsRng> {
372    fn default() -> Self {
373        Self::new(OsRng, Config::default())
374    }
375}
376
377pub struct Error {
378    pub(crate) inner: dial_request::DialBackError,
379}
380
381impl Display for Error {
382    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
383        Display::fmt(&self.inner, f)
384    }
385}
386
387impl Debug for Error {
388    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
389        Debug::fmt(&self.inner, f)
390    }
391}
392
393#[derive(Debug)]
394pub struct Event {
395    /// The address that was selected for testing.
396    pub tested_addr: Multiaddr,
397    /// The amount of data that was sent to the server.
398    /// Is 0 if it wasn't necessary to send any data.
399    /// Otherwise it's a number between 30.000 and 100.000.
400    pub bytes_sent: usize,
401    /// The peer id of the server that was selected for testing.
402    pub server: PeerId,
403    /// The result of the test. If the test was successful, this is `Ok(())`.
404    /// Otherwise it's an error.
405    pub result: Result<(), Error>,
406}
407
408struct ConnectionInfo {
409    peer_id: PeerId,
410    supports_autonat: bool,
411}
412
413#[derive(Copy, Clone, Default)]
414struct AddressInfo {
415    score: usize,
416    status: TestStatus,
417}
418
419impl AddressInfo {
420    fn is_pending_with_nonce(&self, nonce: Nonce) -> bool {
421        match self.status {
422            TestStatus::Pending(c) => c == nonce,
423            _ => false,
424        }
425    }
426
427    fn is_received_with_nonce(&self, nonce: Nonce) -> bool {
428        match self.status {
429            TestStatus::Received(c) => c == nonce,
430            _ => false,
431        }
432    }
433}
434
435#[derive(Clone, Copy, Default, PartialEq)]
436enum TestStatus {
437    #[default]
438    Untested,
439    Pending(Nonce),
440    Failed,
441    Received(Nonce),
442}