libp2p_autonat/v1/behaviour/
as_client.rs1use std::{
22 collections::{HashMap, HashSet, VecDeque},
23 task::{Context, Poll},
24 time::Duration,
25};
26
27use futures::FutureExt;
28use futures_timer::Delay;
29use libp2p_core::Multiaddr;
30use libp2p_identity::PeerId;
31use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId};
32use libp2p_swarm::{ConnectionId, ListenAddresses, ToSwarm};
33use rand::{seq::SliceRandom, thread_rng};
34use web_time::Instant;
35
36use super::{
37 Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus,
38 ProbeId,
39};
40use crate::ResponseError;
41
42#[derive(Debug)]
44pub enum OutboundProbeError {
45 NoServer,
48 NoAddresses,
51 OutboundRequest(OutboundFailure),
53 Response(ResponseError),
55}
56
57#[derive(Debug)]
58pub enum OutboundProbeEvent {
59 Request {
61 probe_id: ProbeId,
62 peer: PeerId,
64 },
65 Response {
67 probe_id: ProbeId,
68 peer: PeerId,
70 address: Multiaddr,
72 },
73 Error {
76 probe_id: ProbeId,
77 peer: Option<PeerId>,
80 error: OutboundProbeError,
81 },
82}
83
84pub(crate) struct AsClient<'a> {
86 pub(crate) inner: &'a mut request_response::Behaviour<AutoNatCodec>,
87 pub(crate) local_peer_id: PeerId,
88 pub(crate) config: &'a Config,
89 pub(crate) connected: &'a HashMap<PeerId, HashMap<ConnectionId, Option<Multiaddr>>>,
90 pub(crate) probe_id: &'a mut ProbeId,
91 pub(crate) servers: &'a HashSet<PeerId>,
92 pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>,
93 pub(crate) nat_status: &'a mut NatStatus,
94 pub(crate) confidence: &'a mut usize,
95 pub(crate) ongoing_outbound: &'a mut HashMap<OutboundRequestId, ProbeId>,
96 pub(crate) last_probe: &'a mut Option<Instant>,
97 pub(crate) schedule_probe: &'a mut Delay,
98 pub(crate) listen_addresses: &'a ListenAddresses,
99 pub(crate) other_candidates: &'a HashSet<Multiaddr>,
100}
101
102impl HandleInnerEvent for AsClient<'_> {
103 fn handle_event(
104 &mut self,
105 event: request_response::Event<DialRequest, DialResponse>,
106 ) -> VecDeque<Action> {
107 match event {
108 request_response::Event::Message {
109 peer,
110 message:
111 request_response::Message::Response {
112 request_id,
113 response,
114 },
115 ..
116 } => {
117 tracing::debug!(?response, "Outbound dial-back request returned response");
118
119 let probe_id = self
120 .ongoing_outbound
121 .remove(&request_id)
122 .expect("OutboundRequestId exists.");
123
124 let event = match response.result.clone() {
125 Ok(address) => OutboundProbeEvent::Response {
126 probe_id,
127 peer,
128 address,
129 },
130 Err(e) => OutboundProbeEvent::Error {
131 probe_id,
132 peer: Some(peer),
133 error: OutboundProbeError::Response(e),
134 },
135 };
136
137 let mut actions = VecDeque::with_capacity(3);
138
139 actions.push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event)));
140
141 if let Some(old) = self.handle_reported_status(response.result.clone().into()) {
142 actions.push_back(ToSwarm::GenerateEvent(Event::StatusChanged {
143 old,
144 new: self.nat_status.clone(),
145 }));
146 }
147
148 if let Ok(address) = response.result {
149 actions.push_back(ToSwarm::ExternalAddrConfirmed(address));
150 }
151
152 actions
153 }
154 request_response::Event::OutboundFailure {
155 peer,
156 error,
157 request_id,
158 ..
159 } => {
160 tracing::debug!(
161 %peer,
162 "Outbound Failure {} when on dial-back request to peer.",
163 error,
164 );
165 let probe_id = self
166 .ongoing_outbound
167 .remove(&request_id)
168 .unwrap_or_else(|| self.probe_id.next());
169
170 self.schedule_probe.reset(Duration::ZERO);
171
172 VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe(
173 OutboundProbeEvent::Error {
174 probe_id,
175 peer: Some(peer),
176 error: OutboundProbeError::OutboundRequest(error),
177 },
178 ))])
179 }
180 _ => VecDeque::default(),
181 }
182 }
183}
184
185impl AsClient<'_> {
186 pub(crate) fn poll_auto_probe(&mut self, cx: &mut Context<'_>) -> Poll<OutboundProbeEvent> {
187 match self.schedule_probe.poll_unpin(cx) {
188 Poll::Ready(()) => {
189 self.schedule_probe.reset(self.config.retry_interval);
190
191 let addresses = self
192 .other_candidates
193 .iter()
194 .chain(self.listen_addresses.iter())
195 .cloned()
196 .collect();
197
198 let probe_id = self.probe_id.next();
199 let event = match self.do_probe(probe_id, addresses) {
200 Ok(peer) => OutboundProbeEvent::Request { probe_id, peer },
201 Err(error) => {
202 self.handle_reported_status(NatStatus::Unknown);
203 OutboundProbeEvent::Error {
204 probe_id,
205 peer: None,
206 error,
207 }
208 }
209 };
210 Poll::Ready(event)
211 }
212 Poll::Pending => Poll::Pending,
213 }
214 }
215
216 pub(crate) fn on_inbound_connection(&mut self) {
218 if *self.confidence == self.config.confidence_max {
219 if self.nat_status.is_public() {
220 self.schedule_next_probe(self.config.refresh_interval * 2);
221 } else {
222 self.schedule_next_probe(self.config.refresh_interval / 5);
223 }
224 }
225 }
226
227 pub(crate) fn on_new_address(&mut self) {
228 if !self.nat_status.is_public() {
229 if *self.confidence > 0 {
231 *self.confidence -= 1;
232 }
233 self.schedule_next_probe(self.config.retry_interval);
234 }
235 }
236
237 pub(crate) fn on_expired_address(&mut self, addr: &Multiaddr) {
238 if let NatStatus::Public(public_address) = self.nat_status {
239 if public_address == addr {
240 *self.confidence = 0;
241 *self.nat_status = NatStatus::Unknown;
242 self.schedule_next_probe(Duration::ZERO);
243 }
244 }
245 }
246
247 fn random_server(&mut self) -> Option<PeerId> {
249 let i = self.throttled_servers.partition_point(|(_, time)| {
251 *time + self.config.throttle_server_period < Instant::now()
252 });
253 self.throttled_servers.drain(..i);
254
255 let mut servers: Vec<&PeerId> = self.servers.iter().collect();
256
257 if self.config.use_connected {
258 servers.extend(self.connected.iter().filter_map(|(id, addrs)| {
259 addrs.values().any(|a| a.is_some()).then_some(id)
263 }));
264 }
265
266 servers.retain(|s| !self.throttled_servers.iter().any(|(id, _)| s == &id));
267
268 servers.choose(&mut thread_rng()).map(|&&p| p)
269 }
270
271 fn do_probe(
275 &mut self,
276 probe_id: ProbeId,
277 addresses: Vec<Multiaddr>,
278 ) -> Result<PeerId, OutboundProbeError> {
279 let _ = self.last_probe.insert(Instant::now());
280 if addresses.is_empty() {
281 tracing::debug!("Outbound dial-back request aborted: No dial-back addresses");
282 return Err(OutboundProbeError::NoAddresses);
283 }
284
285 let server = self.random_server().ok_or(OutboundProbeError::NoServer)?;
286
287 let request_id = self.inner.send_request(
288 &server,
289 DialRequest {
290 peer_id: self.local_peer_id,
291 addresses,
292 },
293 );
294 self.throttled_servers.push((server, Instant::now()));
295 tracing::debug!(peer=%server, "Send dial-back request to peer");
296 self.ongoing_outbound.insert(request_id, probe_id);
297 Ok(server)
298 }
299
300 fn schedule_next_probe(&mut self, delay: Duration) {
303 let Some(last_probe_instant) = self.last_probe else {
304 return;
305 };
306 let schedule_next = *last_probe_instant + delay;
307 self.schedule_probe
308 .reset(schedule_next.saturating_duration_since(Instant::now()));
309 }
310
311 fn handle_reported_status(&mut self, reported_status: NatStatus) -> Option<NatStatus> {
314 self.schedule_next_probe(self.config.retry_interval);
315
316 if matches!(reported_status, NatStatus::Unknown) {
317 return None;
318 }
319
320 if reported_status == *self.nat_status {
321 if *self.confidence < self.config.confidence_max {
322 *self.confidence += 1;
323 }
324 if *self.confidence >= self.config.confidence_max {
326 self.schedule_next_probe(self.config.refresh_interval);
327 }
328 return None;
329 }
330
331 if reported_status.is_public() && self.nat_status.is_public() {
332 *self.nat_status = reported_status;
335 return None;
336 }
337 if *self.confidence > 0 {
338 *self.confidence -= 1;
340 return None;
341 }
342
343 tracing::debug!(
344 old_status=?self.nat_status,
345 new_status=?reported_status,
346 "Flipped assumed NAT status"
347 );
348
349 let old_status = self.nat_status.clone();
350 *self.nat_status = reported_status;
351
352 Some(old_status)
353 }
354}
355
356impl From<Result<Multiaddr, ResponseError>> for NatStatus {
357 fn from(result: Result<Multiaddr, ResponseError>) -> Self {
358 match result {
359 Ok(addr) => NatStatus::Public(addr),
360 Err(ResponseError::DialError) => NatStatus::Private,
361 _ => NatStatus::Unknown,
362 }
363 }
364}