1use std::{
22 collections::HashMap,
23 iter,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use futures::{
29 future::{BoxFuture, FutureExt},
30 stream::{FuturesUnordered, StreamExt},
31};
32use libp2p_core::{transport::PortUse, Endpoint, Multiaddr, PeerRecord};
33use libp2p_identity::{Keypair, PeerId, SigningError};
34use libp2p_request_response::{OutboundRequestId, ProtocolSupport};
35use libp2p_swarm::{
36 ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, THandler,
37 THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40use crate::codec::{
41 Cookie, ErrorCode, Message, Message::*, Namespace, NewRegistration, Registration, Ttl,
42};
43
44pub struct Behaviour {
45 inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
46
47 keypair: Keypair,
48
49 waiting_for_register: HashMap<OutboundRequestId, (PeerId, Namespace)>,
50 waiting_for_discovery: HashMap<OutboundRequestId, (PeerId, Option<Namespace>)>,
51
52 discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
57
58 registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
59
60 expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
63
64 external_addresses: ExternalAddresses,
65}
66
67impl Behaviour {
68 pub fn new(keypair: Keypair) -> Self {
70 Self {
71 inner: libp2p_request_response::Behaviour::with_codec(
72 crate::codec::Codec::default(),
73 iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
74 libp2p_request_response::Config::default(),
75 ),
76 keypair,
77 waiting_for_register: Default::default(),
78 waiting_for_discovery: Default::default(),
79 discovered_peers: Default::default(),
80 registered_namespaces: Default::default(),
81 expiring_registrations: FuturesUnordered::from_iter(vec![
82 futures::future::pending().boxed()
83 ]),
84 external_addresses: Default::default(),
85 }
86 }
87
88 pub fn register(
94 &mut self,
95 namespace: Namespace,
96 rendezvous_node: PeerId,
97 ttl: Option<Ttl>,
98 ) -> Result<(), RegisterError> {
99 let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
100 if external_addresses.is_empty() {
101 return Err(RegisterError::NoExternalAddresses);
102 }
103
104 let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
105 let req_id = self.inner.send_request(
106 &rendezvous_node,
107 Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
108 );
109 self.waiting_for_register
110 .insert(req_id, (rendezvous_node, namespace));
111
112 Ok(())
113 }
114
115 pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
117 self.registered_namespaces
118 .retain(|(rz_node, ns), _| rz_node.ne(&rendezvous_node) && ns.ne(&namespace));
119
120 self.inner
121 .send_request(&rendezvous_node, Unregister(namespace));
122 }
123
124 pub fn discover(
132 &mut self,
133 namespace: Option<Namespace>,
134 cookie: Option<Cookie>,
135 limit: Option<u64>,
136 rendezvous_node: PeerId,
137 ) {
138 let req_id = self.inner.send_request(
139 &rendezvous_node,
140 Discover {
141 namespace: namespace.clone(),
142 cookie,
143 limit,
144 },
145 );
146
147 self.waiting_for_discovery
148 .insert(req_id, (rendezvous_node, namespace));
149 }
150}
151
152#[derive(Debug, thiserror::Error)]
153pub enum RegisterError {
154 #[error("We don't know about any externally reachable addresses of ours")]
155 NoExternalAddresses,
156 #[error("Failed to make a new PeerRecord")]
157 FailedToMakeRecord(#[from] SigningError),
158}
159
160#[derive(Debug)]
161#[allow(clippy::large_enum_variant)]
162pub enum Event {
163 Discovered {
165 rendezvous_node: PeerId,
166 registrations: Vec<Registration>,
167 cookie: Cookie,
168 },
169 DiscoverFailed {
171 rendezvous_node: PeerId,
172 namespace: Option<Namespace>,
173 error: ErrorCode,
174 },
175 Registered {
177 rendezvous_node: PeerId,
178 ttl: Ttl,
179 namespace: Namespace,
180 },
181 RegisterFailed {
183 rendezvous_node: PeerId,
184 namespace: Namespace,
185 error: ErrorCode,
186 },
187 Expired { peer: PeerId },
189}
190
191impl NetworkBehaviour for Behaviour {
192 type ConnectionHandler = <libp2p_request_response::Behaviour<
193 crate::codec::Codec,
194 > as NetworkBehaviour>::ConnectionHandler;
195
196 type ToSwarm = Event;
197
198 fn handle_established_inbound_connection(
199 &mut self,
200 connection_id: ConnectionId,
201 peer: PeerId,
202 local_addr: &Multiaddr,
203 remote_addr: &Multiaddr,
204 ) -> Result<THandler<Self>, ConnectionDenied> {
205 self.inner.handle_established_inbound_connection(
206 connection_id,
207 peer,
208 local_addr,
209 remote_addr,
210 )
211 }
212
213 fn handle_established_outbound_connection(
214 &mut self,
215 connection_id: ConnectionId,
216 peer: PeerId,
217 addr: &Multiaddr,
218 role_override: Endpoint,
219 port_use: PortUse,
220 ) -> Result<THandler<Self>, ConnectionDenied> {
221 self.inner.handle_established_outbound_connection(
222 connection_id,
223 peer,
224 addr,
225 role_override,
226 port_use,
227 )
228 }
229
230 fn on_connection_handler_event(
231 &mut self,
232 peer_id: PeerId,
233 connection_id: ConnectionId,
234 event: THandlerOutEvent<Self>,
235 ) {
236 self.inner
237 .on_connection_handler_event(peer_id, connection_id, event);
238 }
239
240 fn on_swarm_event(&mut self, event: FromSwarm) {
241 let changed = self.external_addresses.on_swarm_event(&event);
242
243 self.inner.on_swarm_event(event);
244
245 if changed && self.external_addresses.iter().count() > 0 {
246 let registered = self.registered_namespaces.clone();
247 for ((rz_node, ns), ttl) in registered {
248 if let Err(e) = self.register(ns, rz_node, Some(ttl)) {
249 tracing::warn!("refreshing registration failed: {e}")
250 }
251 }
252 }
253 }
254
255 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
256 fn poll(
257 &mut self,
258 cx: &mut Context<'_>,
259 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
260 use libp2p_request_response as req_res;
261
262 loop {
263 match self.inner.poll(cx) {
264 Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
265 message:
266 req_res::Message::Response {
267 request_id,
268 response,
269 },
270 ..
271 })) => {
272 if let Some(event) = self.handle_response(&request_id, response) {
273 return Poll::Ready(ToSwarm::GenerateEvent(event));
274 }
275
276 continue; }
278 Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
279 request_id,
280 ..
281 })) => {
282 if let Some(event) = self.event_for_outbound_failure(&request_id) {
283 return Poll::Ready(ToSwarm::GenerateEvent(event));
284 }
285
286 continue; }
288 Poll::Ready(ToSwarm::GenerateEvent(
289 req_res::Event::InboundFailure { .. }
290 | req_res::Event::ResponseSent { .. }
291 | req_res::Event::Message {
292 message: req_res::Message::Request { .. },
293 ..
294 },
295 )) => {
296 unreachable!("rendezvous clients never receive requests")
297 }
298 Poll::Ready(other) => {
299 let new_to_swarm =
300 other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
301
302 return Poll::Ready(new_to_swarm);
303 }
304 Poll::Pending => {}
305 }
306
307 if let Poll::Ready(Some(expired_registration)) =
308 self.expiring_registrations.poll_next_unpin(cx)
309 {
310 self.discovered_peers.remove(&expired_registration);
311 return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
312 peer: expired_registration.0,
313 }));
314 }
315
316 return Poll::Pending;
317 }
318 }
319
320 fn handle_pending_outbound_connection(
321 &mut self,
322 _connection_id: ConnectionId,
323 maybe_peer: Option<PeerId>,
324 _addresses: &[Multiaddr],
325 _effective_role: Endpoint,
326 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
327 let peer = match maybe_peer {
328 None => return Ok(vec![]),
329 Some(peer) => peer,
330 };
331
332 let addresses = self
333 .discovered_peers
334 .iter()
335 .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
336 .flatten()
337 .cloned()
338 .collect();
339
340 Ok(addresses)
341 }
342}
343
344impl Behaviour {
345 fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option<Event> {
346 if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
347 return Some(Event::RegisterFailed {
348 rendezvous_node,
349 namespace,
350 error: ErrorCode::Unavailable,
351 });
352 };
353
354 if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
355 return Some(Event::DiscoverFailed {
356 rendezvous_node,
357 namespace,
358 error: ErrorCode::Unavailable,
359 });
360 };
361
362 None
363 }
364
365 fn handle_response(
366 &mut self,
367 request_id: &OutboundRequestId,
368 response: Message,
369 ) -> Option<Event> {
370 match response {
371 RegisterResponse(Ok(ttl)) => {
372 if let Some((rendezvous_node, namespace)) =
373 self.waiting_for_register.remove(request_id)
374 {
375 self.registered_namespaces
376 .insert((rendezvous_node, namespace.clone()), ttl);
377
378 return Some(Event::Registered {
379 rendezvous_node,
380 ttl,
381 namespace,
382 });
383 }
384
385 None
386 }
387 RegisterResponse(Err(error_code)) => {
388 if let Some((rendezvous_node, namespace)) =
389 self.waiting_for_register.remove(request_id)
390 {
391 return Some(Event::RegisterFailed {
392 rendezvous_node,
393 namespace,
394 error: error_code,
395 });
396 }
397
398 None
399 }
400 DiscoverResponse(Ok((registrations, cookie))) => {
401 if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
402 {
403 self.discovered_peers
404 .extend(registrations.iter().map(|registration| {
405 let peer_id = registration.record.peer_id();
406 let namespace = registration.namespace.clone();
407
408 let addresses = registration.record.addresses().to_vec();
409
410 ((peer_id, namespace), addresses)
411 }));
412
413 self.expiring_registrations
414 .extend(registrations.iter().cloned().map(|registration| {
415 async move {
416 futures_timer::Delay::new(Duration::from_secs(registration.ttl))
418 .await;
419
420 (registration.record.peer_id(), registration.namespace)
421 }
422 .boxed()
423 }));
424
425 return Some(Event::Discovered {
426 rendezvous_node,
427 registrations,
428 cookie,
429 });
430 }
431
432 None
433 }
434 DiscoverResponse(Err(error_code)) => {
435 if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
436 return Some(Event::DiscoverFailed {
437 rendezvous_node,
438 namespace: ns,
439 error: error_code,
440 });
441 }
442
443 None
444 }
445 _ => unreachable!("rendezvous clients never receive requests"),
446 }
447 }
448}