1use std::{
2 collections::{HashMap, VecDeque},
3 fmt::{Debug, Display, Formatter},
4 task::{Context, Poll},
5 time::Duration,
6};
7
8use either::Either;
9use futures::FutureExt;
10use futures_timer::Delay;
11use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
12use libp2p_identity::PeerId;
13use libp2p_swarm::{
14 behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionHandler,
15 ConnectionId, FromSwarm, NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm,
16};
17use rand::prelude::*;
18use rand_core::OsRng;
19
20use super::handler::{
21 dial_back::{self, IncomingNonce},
22 dial_request,
23};
24use crate::v2::{protocol::DialRequest, Nonce};
25
26#[derive(Debug, Clone, Copy)]
27pub struct Config {
28 pub(crate) max_candidates: usize,
30
31 pub(crate) probe_interval: Duration,
33}
34
35impl Config {
36 pub fn with_max_candidates(self, max_candidates: usize) -> Self {
37 Self {
38 max_candidates,
39 ..self
40 }
41 }
42
43 pub fn with_probe_interval(self, probe_interval: Duration) -> Self {
44 Self {
45 probe_interval,
46 ..self
47 }
48 }
49}
50
51impl Default for Config {
52 fn default() -> Self {
53 Self {
54 max_candidates: 10,
55 probe_interval: Duration::from_secs(5),
56 }
57 }
58}
59
60pub struct Behaviour<R = OsRng>
61where
62 R: RngCore + 'static,
63{
64 rng: R,
65 config: Config,
66 pending_events: VecDeque<
67 ToSwarm<
68 <Self as NetworkBehaviour>::ToSwarm,
69 <<Self as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::FromBehaviour,
70 >,
71 >,
72 address_candidates: HashMap<Multiaddr, AddressInfo>,
73 next_tick: Delay,
74 peer_info: HashMap<ConnectionId, ConnectionInfo>,
75}
76
77impl<R> NetworkBehaviour for Behaviour<R>
78where
79 R: RngCore + 'static,
80{
81 type ConnectionHandler = Either<dial_request::Handler, dial_back::Handler>;
82
83 type ToSwarm = Event;
84
85 fn handle_established_inbound_connection(
86 &mut self,
87 _: ConnectionId,
88 _: PeerId,
89 _: &Multiaddr,
90 _: &Multiaddr,
91 ) -> Result<<Self as NetworkBehaviour>::ConnectionHandler, ConnectionDenied> {
92 Ok(Either::Right(dial_back::Handler::new()))
93 }
94
95 fn handle_established_outbound_connection(
96 &mut self,
97 _: ConnectionId,
98 _: PeerId,
99 _: &Multiaddr,
100 _: Endpoint,
101 _: PortUse,
102 ) -> Result<<Self as NetworkBehaviour>::ConnectionHandler, ConnectionDenied> {
103 Ok(Either::Left(dial_request::Handler::new()))
104 }
105
106 fn on_swarm_event(&mut self, event: FromSwarm) {
107 match event {
108 FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
109 self.address_candidates
110 .entry(addr.clone())
111 .or_default()
112 .score += 1;
113 }
114 FromSwarm::ConnectionEstablished(ConnectionEstablished {
115 peer_id,
116 connection_id,
117 endpoint: _,
118 ..
119 }) => {
120 self.peer_info.insert(
121 connection_id,
122 ConnectionInfo {
123 peer_id,
124 supports_autonat: false,
125 },
126 );
127 }
128 FromSwarm::ConnectionClosed(ConnectionClosed {
129 peer_id,
130 connection_id,
131 ..
132 }) => {
133 let info = self
134 .peer_info
135 .remove(&connection_id)
136 .expect("inconsistent state");
137
138 if info.supports_autonat {
139 tracing::debug!(%peer_id, "Disconnected from AutoNAT server");
140 }
141 }
142 _ => {}
143 }
144 }
145
146 fn on_connection_handler_event(
147 &mut self,
148 peer_id: PeerId,
149 connection_id: ConnectionId,
150 event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
151 ) {
152 let (nonce, outcome) = match event {
153 Either::Right(IncomingNonce { nonce, sender }) => {
154 let Some((_, info)) = self
155 .address_candidates
156 .iter_mut()
157 .find(|(_, info)| info.is_pending_with_nonce(nonce))
158 else {
159 let _ = sender.send(Err(std::io::Error::new(
160 std::io::ErrorKind::InvalidData,
161 format!("Received unexpected nonce: {nonce} from {peer_id}"),
162 )));
163 return;
164 };
165
166 info.status = TestStatus::Received(nonce);
167 tracing::debug!(%peer_id, %nonce, "Successful dial-back");
168
169 let _ = sender.send(Ok(()));
170
171 return;
172 }
173 Either::Left(dial_request::ToBehaviour::PeerHasServerSupport) => {
174 self.peer_info
175 .get_mut(&connection_id)
176 .expect("inconsistent state")
177 .supports_autonat = true;
178 return;
179 }
180 Either::Left(dial_request::ToBehaviour::TestOutcome { nonce, outcome }) => {
181 (nonce, outcome)
182 }
183 };
184
185 let ((tested_addr, bytes_sent), result) = match outcome {
186 Ok(address) => {
187 let received_dial_back = self
188 .address_candidates
189 .iter_mut()
190 .any(|(_, info)| info.is_received_with_nonce(nonce));
191
192 if !received_dial_back {
193 tracing::warn!(
194 %peer_id,
195 %nonce,
196 "Server reported reachbility but we never received a dial-back"
197 );
198 return;
199 }
200
201 self.pending_events
202 .push_back(ToSwarm::ExternalAddrConfirmed(address.0.clone()));
203
204 (address, Ok(()))
205 }
206 Err(dial_request::Error::UnsupportedProtocol) => {
207 self.peer_info
208 .get_mut(&connection_id)
209 .expect("inconsistent state")
210 .supports_autonat = false;
211
212 self.reset_status_to(nonce, TestStatus::Untested); return;
215 }
216 Err(dial_request::Error::Io(e)) => {
217 tracing::debug!(
218 %peer_id,
219 %nonce,
220 "Failed to complete AutoNAT probe: {e}"
221 );
222
223 self.reset_status_to(nonce, TestStatus::Untested); return;
226 }
227 Err(dial_request::Error::AddressNotReachable {
228 address,
229 bytes_sent,
230 error,
231 }) => {
232 self.reset_status_to(nonce, TestStatus::Failed);
233
234 ((address, bytes_sent), Err(error))
235 }
236 };
237
238 self.pending_events.push_back(ToSwarm::GenerateEvent(Event {
239 tested_addr,
240 bytes_sent,
241 server: peer_id,
242 result: result.map_err(|e| Error { inner: e }),
243 }));
244 }
245
246 fn poll(
247 &mut self,
248 cx: &mut Context<'_>,
249 ) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
250 {
251 loop {
252 if let Some(event) = self.pending_events.pop_front() {
253 return Poll::Ready(event);
254 }
255
256 if self.next_tick.poll_unpin(cx).is_ready() {
257 self.next_tick.reset(self.config.probe_interval);
258
259 self.issue_dial_requests_for_untested_candidates();
260 continue;
261 }
262
263 return Poll::Pending;
264 }
265 }
266}
267
268impl<R> Behaviour<R>
269where
270 R: RngCore + 'static,
271{
272 pub fn new(rng: R, config: Config) -> Self {
273 Self {
274 rng,
275 next_tick: Delay::new(config.probe_interval),
276 config,
277 pending_events: VecDeque::new(),
278 address_candidates: HashMap::new(),
279 peer_info: HashMap::new(),
280 }
281 }
282
283 fn issue_dial_requests_for_untested_candidates(&mut self) {
290 for addr in self.untested_candidates() {
291 let Some((conn_id, peer_id)) = self.random_autonat_server() else {
292 tracing::debug!("Not connected to any AutoNAT servers");
293 return;
294 };
295
296 let nonce = self.rng.gen();
297 self.address_candidates
298 .get_mut(&addr)
299 .expect("only emit candidates")
300 .status = TestStatus::Pending(nonce);
301
302 self.pending_events.push_back(ToSwarm::NotifyHandler {
303 peer_id,
304 handler: NotifyHandler::One(conn_id),
305 event: Either::Left(DialRequest {
306 nonce,
307 addrs: vec![addr],
308 }),
309 });
310 }
311 }
312
313 fn untested_candidates(&self) -> impl Iterator<Item = Multiaddr> {
318 let mut entries = self
319 .address_candidates
320 .iter()
321 .filter(|(_, info)| info.status == TestStatus::Untested)
322 .map(|(addr, count)| (addr.clone(), *count))
323 .collect::<Vec<_>>();
324
325 entries.sort_unstable_by_key(|(_, info)| info.score);
326
327 if entries.is_empty() {
328 tracing::debug!("No untested address candidates");
329 }
330
331 entries
332 .into_iter()
333 .rev() .take(self.config.max_candidates)
335 .map(|(addr, _)| addr)
336 }
337
338 fn random_autonat_server(&mut self) -> Option<(ConnectionId, PeerId)> {
341 let (conn_id, info) = self
342 .peer_info
343 .iter()
344 .filter(|(_, info)| info.supports_autonat)
345 .choose(&mut self.rng)?;
346
347 Some((*conn_id, info.peer_id))
348 }
349
350 fn reset_status_to(&mut self, nonce: Nonce, new_status: TestStatus) {
351 let Some((_, info)) = self
352 .address_candidates
353 .iter_mut()
354 .find(|(_, i)| i.is_pending_with_nonce(nonce) || i.is_received_with_nonce(nonce))
355 else {
356 return;
357 };
358
359 info.status = new_status;
360 }
361
362 #[doc(hidden)]
364 pub fn validate_addr(&mut self, addr: &Multiaddr) {
365 if let Some(info) = self.address_candidates.get_mut(addr) {
366 info.status = TestStatus::Received(self.rng.next_u64());
367 }
368 }
369}
370
371impl Default for Behaviour<OsRng> {
372 fn default() -> Self {
373 Self::new(OsRng, Config::default())
374 }
375}
376
377pub struct Error {
378 pub(crate) inner: dial_request::DialBackError,
379}
380
381impl Display for Error {
382 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
383 Display::fmt(&self.inner, f)
384 }
385}
386
387impl Debug for Error {
388 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
389 Debug::fmt(&self.inner, f)
390 }
391}
392
393#[derive(Debug)]
394pub struct Event {
395 pub tested_addr: Multiaddr,
397 pub bytes_sent: usize,
401 pub server: PeerId,
403 pub result: Result<(), Error>,
406}
407
408struct ConnectionInfo {
409 peer_id: PeerId,
410 supports_autonat: bool,
411}
412
413#[derive(Copy, Clone, Default)]
414struct AddressInfo {
415 score: usize,
416 status: TestStatus,
417}
418
419impl AddressInfo {
420 fn is_pending_with_nonce(&self, nonce: Nonce) -> bool {
421 match self.status {
422 TestStatus::Pending(c) => c == nonce,
423 _ => false,
424 }
425 }
426
427 fn is_received_with_nonce(&self, nonce: Nonce) -> bool {
428 match self.status {
429 TestStatus::Received(c) => c == nonce,
430 _ => false,
431 }
432 }
433}
434
435#[derive(Clone, Copy, Default, PartialEq)]
436enum TestStatus {
437 #[default]
438 Untested,
439 Pending(Nonce),
440 Failed,
441 Received(Nonce),
442}