1pub(crate) mod handler;
24pub(crate) mod rate_limiter;
25use std::{
26 collections::{hash_map, HashMap, HashSet, VecDeque},
27 num::NonZeroU32,
28 ops::Add,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use either::Either;
34use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37 behaviour::{ConnectionClosed, FromSwarm},
38 dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use web_time::Instant;
42
43use crate::{
44 behaviour::handler::Handler,
45 multiaddr_ext::MultiaddrExt,
46 proto,
47 protocol::{inbound_hop, outbound_stop},
48};
49
50pub struct Config {
56 pub max_reservations: usize,
57 pub max_reservations_per_peer: usize,
58 pub reservation_duration: Duration,
59 pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60
61 pub max_circuits: usize,
62 pub max_circuits_per_peer: usize,
63 pub max_circuit_duration: Duration,
64 pub max_circuit_bytes: u64,
65 pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66}
67
68impl Config {
69 pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
70 self.reservation_rate_limiters
71 .push(rate_limiter::new_per_peer(
72 rate_limiter::GenericRateLimiterConfig { limit, interval },
73 ));
74 self
75 }
76
77 pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
78 self.circuit_src_rate_limiters
79 .push(rate_limiter::new_per_peer(
80 rate_limiter::GenericRateLimiterConfig { limit, interval },
81 ));
82 self
83 }
84
85 pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
86 self.reservation_rate_limiters
87 .push(rate_limiter::new_per_ip(
88 rate_limiter::GenericRateLimiterConfig { limit, interval },
89 ));
90 self
91 }
92
93 pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
94 self.circuit_src_rate_limiters
95 .push(rate_limiter::new_per_ip(
96 rate_limiter::GenericRateLimiterConfig { limit, interval },
97 ));
98 self
99 }
100}
101
102impl std::fmt::Debug for Config {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("Config")
105 .field("max_reservations", &self.max_reservations)
106 .field("max_reservations_per_peer", &self.max_reservations_per_peer)
107 .field("reservation_duration", &self.reservation_duration)
108 .field(
109 "reservation_rate_limiters",
110 &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
111 )
112 .field("max_circuits", &self.max_circuits)
113 .field("max_circuits_per_peer", &self.max_circuits_per_peer)
114 .field("max_circuit_duration", &self.max_circuit_duration)
115 .field("max_circuit_bytes", &self.max_circuit_bytes)
116 .field(
117 "circuit_src_rate_limiters",
118 &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
119 )
120 .finish()
121 }
122}
123
124impl Default for Config {
125 fn default() -> Self {
126 let reservation_rate_limiters = vec![
127 rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
130 limit: NonZeroU32::new(30).expect("30 > 0"),
131 interval: Duration::from_secs(60 * 2),
132 }),
133 rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
136 limit: NonZeroU32::new(60).expect("60 > 0"),
137 interval: Duration::from_secs(60),
138 }),
139 ];
140
141 let circuit_src_rate_limiters = vec![
142 rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
144 limit: NonZeroU32::new(30).expect("30 > 0"),
145 interval: Duration::from_secs(60 * 2),
146 }),
147 rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
149 limit: NonZeroU32::new(60).expect("60 > 0"),
150 interval: Duration::from_secs(60),
151 }),
152 ];
153
154 Config {
155 max_reservations: 128,
156 max_reservations_per_peer: 4,
157 reservation_duration: Duration::from_secs(60 * 60),
158 reservation_rate_limiters,
159
160 max_circuits: 16,
161 max_circuits_per_peer: 4,
162 max_circuit_duration: Duration::from_secs(2 * 60),
163 max_circuit_bytes: 1 << 17, circuit_src_rate_limiters,
165 }
166 }
167}
168
169#[derive(Debug)]
171pub enum Event {
172 ReservationReqAccepted {
174 src_peer_id: PeerId,
175 renewed: bool,
177 },
178 #[deprecated(
180 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181 )]
182 ReservationReqAcceptFailed {
183 src_peer_id: PeerId,
184 error: inbound_hop::Error,
185 },
186 ReservationReqDenied { src_peer_id: PeerId },
188 #[deprecated(
190 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
191 )]
192 ReservationReqDenyFailed {
193 src_peer_id: PeerId,
194 error: inbound_hop::Error,
195 },
196 ReservationTimedOut { src_peer_id: PeerId },
198 CircuitReqDenied {
200 src_peer_id: PeerId,
201 dst_peer_id: PeerId,
202 },
203 #[deprecated(
205 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
206 )]
207 CircuitReqDenyFailed {
208 src_peer_id: PeerId,
209 dst_peer_id: PeerId,
210 error: inbound_hop::Error,
211 },
212 CircuitReqAccepted {
214 src_peer_id: PeerId,
215 dst_peer_id: PeerId,
216 },
217 #[deprecated(
219 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
220 )]
221 CircuitReqOutboundConnectFailed {
222 src_peer_id: PeerId,
223 dst_peer_id: PeerId,
224 error: outbound_stop::Error,
225 },
226 #[deprecated(
228 note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
229 )]
230 CircuitReqAcceptFailed {
231 src_peer_id: PeerId,
232 dst_peer_id: PeerId,
233 error: inbound_hop::Error,
234 },
235 CircuitClosed {
237 src_peer_id: PeerId,
238 dst_peer_id: PeerId,
239 error: Option<std::io::Error>,
240 },
241}
242
243pub struct Behaviour {
246 config: Config,
247
248 local_peer_id: PeerId,
249
250 reservations: HashMap<PeerId, HashSet<ConnectionId>>,
251 circuits: CircuitsTracker,
252
253 queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
255
256 external_addresses: ExternalAddresses,
257}
258
259impl Behaviour {
260 pub fn new(local_peer_id: PeerId, config: Config) -> Self {
261 Self {
262 config,
263 local_peer_id,
264 reservations: Default::default(),
265 circuits: Default::default(),
266 queued_actions: Default::default(),
267 external_addresses: Default::default(),
268 }
269 }
270
271 fn on_connection_closed(
272 &mut self,
273 ConnectionClosed {
274 peer_id,
275 connection_id,
276 ..
277 }: ConnectionClosed,
278 ) {
279 if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
280 peer.get_mut().remove(&connection_id);
281 if peer.get().is_empty() {
282 peer.remove();
283 }
284 }
285
286 for circuit in self
287 .circuits
288 .remove_by_connection(peer_id, connection_id)
289 .iter()
290 .filter(|c| matches!(c.status, CircuitStatus::Accepted))
292 {
293 self.queued_actions
294 .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
295 src_peer_id: circuit.src_peer_id,
296 dst_peer_id: circuit.dst_peer_id,
297 error: Some(std::io::ErrorKind::ConnectionAborted.into()),
298 }));
299 }
300 }
301}
302
303impl NetworkBehaviour for Behaviour {
304 type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
305 type ToSwarm = Event;
306
307 fn handle_established_inbound_connection(
308 &mut self,
309 _: ConnectionId,
310 _: PeerId,
311 local_addr: &Multiaddr,
312 remote_addr: &Multiaddr,
313 ) -> Result<THandler<Self>, ConnectionDenied> {
314 if local_addr.is_relayed() {
315 return Ok(Either::Right(dummy::ConnectionHandler));
317 }
318
319 Ok(Either::Left(Handler::new(
320 handler::Config {
321 reservation_duration: self.config.reservation_duration,
322 max_circuit_duration: self.config.max_circuit_duration,
323 max_circuit_bytes: self.config.max_circuit_bytes,
324 },
325 ConnectedPoint::Listener {
326 local_addr: local_addr.clone(),
327 send_back_addr: remote_addr.clone(),
328 },
329 )))
330 }
331
332 fn handle_established_outbound_connection(
333 &mut self,
334 _: ConnectionId,
335 _: PeerId,
336 addr: &Multiaddr,
337 role_override: Endpoint,
338 port_use: PortUse,
339 ) -> Result<THandler<Self>, ConnectionDenied> {
340 if addr.is_relayed() {
341 return Ok(Either::Right(dummy::ConnectionHandler));
343 }
344
345 Ok(Either::Left(Handler::new(
346 handler::Config {
347 reservation_duration: self.config.reservation_duration,
348 max_circuit_duration: self.config.max_circuit_duration,
349 max_circuit_bytes: self.config.max_circuit_bytes,
350 },
351 ConnectedPoint::Dialer {
352 address: addr.clone(),
353 role_override,
354 port_use,
355 },
356 )))
357 }
358
359 fn on_swarm_event(&mut self, event: FromSwarm) {
360 self.external_addresses.on_swarm_event(&event);
361
362 if let FromSwarm::ConnectionClosed(connection_closed) = event {
363 self.on_connection_closed(connection_closed)
364 }
365 }
366
367 fn on_connection_handler_event(
368 &mut self,
369 event_source: PeerId,
370 connection: ConnectionId,
371 event: THandlerOutEvent<Self>,
372 ) {
373 let event = match event {
374 Either::Left(e) => e,
375 #[allow(unreachable_patterns)]
377 Either::Right(v) => libp2p_core::util::unreachable(v),
378 };
379
380 match event {
381 handler::Event::ReservationReqReceived {
382 inbound_reservation_req,
383 endpoint,
384 renewed,
385 } => {
386 let now = Instant::now();
387
388 assert!(
389 !endpoint.is_relayed(),
390 "`dummy::ConnectionHandler` handles relayed connections. It \
391 denies all inbound substreams."
392 );
393
394 let action = if
395 (!renewed
398 && self
399 .reservations
400 .get(&event_source)
401 .map(|cs| cs.len())
402 .unwrap_or(0)
403 > self.config.max_reservations_per_peer)
404 || self
406 .reservations
407 .values()
408 .map(|cs| cs.len())
409 .sum::<usize>()
410 >= self.config.max_reservations
411 || !self
413 .config
414 .reservation_rate_limiters
415 .iter_mut()
416 .all(|limiter| {
417 limiter.try_next(event_source, endpoint.get_remote_address(), now)
418 }) {
419 ToSwarm::NotifyHandler {
420 handler: NotifyHandler::One(connection),
421 peer_id: event_source,
422 event: Either::Left(handler::In::DenyReservationReq {
423 inbound_reservation_req,
424 status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
425 }),
426 }
427 } else {
428 self.reservations
430 .entry(event_source)
431 .or_default()
432 .insert(connection);
433
434 ToSwarm::NotifyHandler {
435 handler: NotifyHandler::One(connection),
436 peer_id: event_source,
437 event: Either::Left(handler::In::AcceptReservationReq {
438 inbound_reservation_req,
439 addrs: self
440 .external_addresses
441 .iter()
442 .cloned()
443 .filter_map(|a| match a.iter().last()? {
445 Protocol::P2p(_) => Some(a),
446 _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
447 })
448 .collect(),
449 }),
450 }
451 };
452
453 self.queued_actions.push_back(action);
454 }
455 handler::Event::ReservationReqAccepted { renewed } => {
456 self.reservations
459 .entry(event_source)
460 .or_default()
461 .insert(connection);
462
463 self.queued_actions.push_back(ToSwarm::GenerateEvent(
464 Event::ReservationReqAccepted {
465 src_peer_id: event_source,
466 renewed,
467 },
468 ));
469 }
470 handler::Event::ReservationReqAcceptFailed { error } => {
471 #[allow(deprecated)]
472 self.queued_actions.push_back(ToSwarm::GenerateEvent(
473 Event::ReservationReqAcceptFailed {
474 src_peer_id: event_source,
475 error,
476 },
477 ));
478 }
479 handler::Event::ReservationReqDenied {} => {
480 self.queued_actions.push_back(ToSwarm::GenerateEvent(
481 Event::ReservationReqDenied {
482 src_peer_id: event_source,
483 },
484 ));
485 }
486 handler::Event::ReservationReqDenyFailed { error } => {
487 #[allow(deprecated)]
488 self.queued_actions.push_back(ToSwarm::GenerateEvent(
489 Event::ReservationReqDenyFailed {
490 src_peer_id: event_source,
491 error,
492 },
493 ));
494 }
495 handler::Event::ReservationTimedOut {} => {
496 match self.reservations.entry(event_source) {
497 hash_map::Entry::Occupied(mut peer) => {
498 peer.get_mut().remove(&connection);
499 if peer.get().is_empty() {
500 peer.remove();
501 }
502 }
503 hash_map::Entry::Vacant(_) => {
504 unreachable!(
505 "Expect to track timed out reservation with peer {:?} on connection {:?}",
506 event_source,
507 connection,
508 );
509 }
510 }
511
512 self.queued_actions
513 .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
514 src_peer_id: event_source,
515 }));
516 }
517 handler::Event::CircuitReqReceived {
518 inbound_circuit_req,
519 endpoint,
520 } => {
521 let now = Instant::now();
522
523 assert!(
524 !endpoint.is_relayed(),
525 "`dummy::ConnectionHandler` handles relayed connections. It \
526 denies all inbound substreams."
527 );
528
529 let action = if self.circuits.num_circuits_of_peer(event_source)
530 > self.config.max_circuits_per_peer
531 || self.circuits.len() >= self.config.max_circuits
532 || !self
533 .config
534 .circuit_src_rate_limiters
535 .iter_mut()
536 .all(|limiter| {
537 limiter.try_next(event_source, endpoint.get_remote_address(), now)
538 }) {
539 ToSwarm::NotifyHandler {
541 handler: NotifyHandler::One(connection),
542 peer_id: event_source,
543 event: Either::Left(handler::In::DenyCircuitReq {
544 circuit_id: None,
545 inbound_circuit_req,
546 status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
547 }),
548 }
549 } else if let Some(dst_conn) = self
550 .reservations
551 .get(&inbound_circuit_req.dst())
552 .and_then(|cs| cs.iter().next())
553 {
554 let circuit_id = self.circuits.insert(Circuit {
556 status: CircuitStatus::Accepting,
557 src_peer_id: event_source,
558 src_connection_id: connection,
559 dst_peer_id: inbound_circuit_req.dst(),
560 dst_connection_id: *dst_conn,
561 });
562
563 ToSwarm::NotifyHandler {
564 handler: NotifyHandler::One(*dst_conn),
565 peer_id: event_source,
566 event: Either::Left(handler::In::NegotiateOutboundConnect {
567 circuit_id,
568 inbound_circuit_req,
569 src_peer_id: event_source,
570 src_connection_id: connection,
571 }),
572 }
573 } else {
574 ToSwarm::NotifyHandler {
576 handler: NotifyHandler::One(connection),
577 peer_id: event_source,
578 event: Either::Left(handler::In::DenyCircuitReq {
579 circuit_id: None,
580 inbound_circuit_req,
581 status: proto::Status::NO_RESERVATION,
582 }),
583 }
584 };
585 self.queued_actions.push_back(action);
586 }
587 handler::Event::CircuitReqDenied {
588 circuit_id,
589 dst_peer_id,
590 } => {
591 if let Some(circuit_id) = circuit_id {
592 self.circuits.remove(circuit_id);
593 }
594
595 self.queued_actions
596 .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
597 src_peer_id: event_source,
598 dst_peer_id,
599 }));
600 }
601 handler::Event::CircuitReqDenyFailed {
602 circuit_id,
603 dst_peer_id,
604 error,
605 } => {
606 if let Some(circuit_id) = circuit_id {
607 self.circuits.remove(circuit_id);
608 }
609
610 #[allow(deprecated)]
611 self.queued_actions.push_back(ToSwarm::GenerateEvent(
612 Event::CircuitReqDenyFailed {
613 src_peer_id: event_source,
614 dst_peer_id,
615 error,
616 },
617 ));
618 }
619 handler::Event::OutboundConnectNegotiated {
620 circuit_id,
621 src_peer_id,
622 src_connection_id,
623 inbound_circuit_req,
624 dst_stream,
625 dst_pending_data,
626 } => {
627 self.queued_actions.push_back(ToSwarm::NotifyHandler {
628 handler: NotifyHandler::One(src_connection_id),
629 peer_id: src_peer_id,
630 event: Either::Left(handler::In::AcceptAndDriveCircuit {
631 circuit_id,
632 dst_peer_id: event_source,
633 inbound_circuit_req,
634 dst_stream,
635 dst_pending_data,
636 }),
637 });
638 }
639 handler::Event::OutboundConnectNegotiationFailed {
640 circuit_id,
641 src_peer_id,
642 src_connection_id,
643 inbound_circuit_req,
644 status,
645 error,
646 } => {
647 self.queued_actions.push_back(ToSwarm::NotifyHandler {
648 handler: NotifyHandler::One(src_connection_id),
649 peer_id: src_peer_id,
650 event: Either::Left(handler::In::DenyCircuitReq {
651 circuit_id: Some(circuit_id),
652 inbound_circuit_req,
653 status,
654 }),
655 });
656 #[allow(deprecated)]
657 self.queued_actions.push_back(ToSwarm::GenerateEvent(
658 Event::CircuitReqOutboundConnectFailed {
659 src_peer_id,
660 dst_peer_id: event_source,
661 error,
662 },
663 ));
664 }
665 handler::Event::CircuitReqAccepted {
666 dst_peer_id,
667 circuit_id,
668 } => {
669 self.circuits.accepted(circuit_id);
670 self.queued_actions
671 .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
672 src_peer_id: event_source,
673 dst_peer_id,
674 }));
675 }
676 handler::Event::CircuitReqAcceptFailed {
677 dst_peer_id,
678 circuit_id,
679 error,
680 } => {
681 self.circuits.remove(circuit_id);
682 #[allow(deprecated)]
683 self.queued_actions.push_back(ToSwarm::GenerateEvent(
684 Event::CircuitReqAcceptFailed {
685 src_peer_id: event_source,
686 dst_peer_id,
687 error,
688 },
689 ));
690 }
691 handler::Event::CircuitClosed {
692 dst_peer_id,
693 circuit_id,
694 error,
695 } => {
696 self.circuits.remove(circuit_id);
697
698 self.queued_actions
699 .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
700 src_peer_id: event_source,
701 dst_peer_id,
702 error,
703 }));
704 }
705 }
706 }
707
708 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
709 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
710 if let Some(to_swarm) = self.queued_actions.pop_front() {
711 return Poll::Ready(to_swarm);
712 }
713
714 Poll::Pending
715 }
716}
717
718#[derive(Default)]
719struct CircuitsTracker {
720 next_id: CircuitId,
721 circuits: HashMap<CircuitId, Circuit>,
722}
723
724impl CircuitsTracker {
725 fn len(&self) -> usize {
726 self.circuits.len()
727 }
728
729 fn insert(&mut self, circuit: Circuit) -> CircuitId {
730 let id = self.next_id;
731 self.next_id = self.next_id + 1;
732
733 self.circuits.insert(id, circuit);
734
735 id
736 }
737
738 fn accepted(&mut self, circuit_id: CircuitId) {
739 if let Some(c) = self.circuits.get_mut(&circuit_id) {
740 c.status = CircuitStatus::Accepted;
741 };
742 }
743
744 fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
745 self.circuits.remove(&circuit_id)
746 }
747
748 fn remove_by_connection(
749 &mut self,
750 peer_id: PeerId,
751 connection_id: ConnectionId,
752 ) -> Vec<Circuit> {
753 let mut removed = vec![];
754
755 self.circuits.retain(|_circuit_id, circuit| {
756 let is_src =
757 circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
758 let is_dst =
759 circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
760
761 if is_src || is_dst {
762 removed.push(circuit.clone());
763 false
765 } else {
766 true
768 }
769 });
770
771 removed
772 }
773
774 fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
775 self.circuits
776 .iter()
777 .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
778 .count()
779 }
780}
781
782#[derive(Clone)]
783struct Circuit {
784 src_peer_id: PeerId,
785 src_connection_id: ConnectionId,
786 dst_peer_id: PeerId,
787 dst_connection_id: ConnectionId,
788 status: CircuitStatus,
789}
790
791#[derive(Clone)]
792enum CircuitStatus {
793 Accepting,
794 Accepted,
795}
796
797#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
798pub struct CircuitId(u64);
799
800impl Add<u64> for CircuitId {
801 type Output = CircuitId;
802
803 fn add(self, rhs: u64) -> Self {
804 CircuitId(self.0 + rhs)
805 }
806}