libp2p_relay/
behaviour.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`NetworkBehaviour`] to act as a circuit relay v2 **relay**.
22
23pub(crate) mod handler;
24pub(crate) mod rate_limiter;
25use std::{
26    collections::{hash_map, HashMap, HashSet, VecDeque},
27    num::NonZeroU32,
28    ops::Add,
29    task::{Context, Poll},
30    time::Duration,
31};
32
33use either::Either;
34use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37    behaviour::{ConnectionClosed, FromSwarm},
38    dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler,
39    THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
40};
41use web_time::Instant;
42
43use crate::{
44    behaviour::handler::Handler,
45    multiaddr_ext::MultiaddrExt,
46    proto,
47    protocol::{inbound_hop, outbound_stop},
48};
49
50/// Configuration for the relay [`Behaviour`].
51///
52/// # Panics
53///
54/// [`Config::max_circuit_duration`] may not exceed [`u32::MAX`].
55pub struct Config {
56    pub max_reservations: usize,
57    pub max_reservations_per_peer: usize,
58    pub reservation_duration: Duration,
59    pub reservation_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
60
61    pub max_circuits: usize,
62    pub max_circuits_per_peer: usize,
63    pub max_circuit_duration: Duration,
64    pub max_circuit_bytes: u64,
65    pub circuit_src_rate_limiters: Vec<Box<dyn rate_limiter::RateLimiter>>,
66}
67
68impl Config {
69    pub fn reservation_rate_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
70        self.reservation_rate_limiters
71            .push(rate_limiter::new_per_peer(
72                rate_limiter::GenericRateLimiterConfig { limit, interval },
73            ));
74        self
75    }
76
77    pub fn circuit_src_per_peer(mut self, limit: NonZeroU32, interval: Duration) -> Self {
78        self.circuit_src_rate_limiters
79            .push(rate_limiter::new_per_peer(
80                rate_limiter::GenericRateLimiterConfig { limit, interval },
81            ));
82        self
83    }
84
85    pub fn reservation_rate_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
86        self.reservation_rate_limiters
87            .push(rate_limiter::new_per_ip(
88                rate_limiter::GenericRateLimiterConfig { limit, interval },
89            ));
90        self
91    }
92
93    pub fn circuit_src_per_ip(mut self, limit: NonZeroU32, interval: Duration) -> Self {
94        self.circuit_src_rate_limiters
95            .push(rate_limiter::new_per_ip(
96                rate_limiter::GenericRateLimiterConfig { limit, interval },
97            ));
98        self
99    }
100}
101
102impl std::fmt::Debug for Config {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("Config")
105            .field("max_reservations", &self.max_reservations)
106            .field("max_reservations_per_peer", &self.max_reservations_per_peer)
107            .field("reservation_duration", &self.reservation_duration)
108            .field(
109                "reservation_rate_limiters",
110                &format!("[{} rate limiters]", self.reservation_rate_limiters.len()),
111            )
112            .field("max_circuits", &self.max_circuits)
113            .field("max_circuits_per_peer", &self.max_circuits_per_peer)
114            .field("max_circuit_duration", &self.max_circuit_duration)
115            .field("max_circuit_bytes", &self.max_circuit_bytes)
116            .field(
117                "circuit_src_rate_limiters",
118                &format!("[{} rate limiters]", self.circuit_src_rate_limiters.len()),
119            )
120            .finish()
121    }
122}
123
124impl Default for Config {
125    fn default() -> Self {
126        let reservation_rate_limiters = vec![
127            // For each peer ID one reservation every 2 minutes with up
128            // to 30 reservations per hour.
129            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
130                limit: NonZeroU32::new(30).expect("30 > 0"),
131                interval: Duration::from_secs(60 * 2),
132            }),
133            // For each IP address one reservation every minute with up
134            // to 60 reservations per hour.
135            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
136                limit: NonZeroU32::new(60).expect("60 > 0"),
137                interval: Duration::from_secs(60),
138            }),
139        ];
140
141        let circuit_src_rate_limiters = vec![
142            // For each source peer ID one circuit every 2 minute with up to 30 circuits per hour.
143            rate_limiter::new_per_peer(rate_limiter::GenericRateLimiterConfig {
144                limit: NonZeroU32::new(30).expect("30 > 0"),
145                interval: Duration::from_secs(60 * 2),
146            }),
147            // For each source IP address one circuit every minute with up to 60 circuits per hour.
148            rate_limiter::new_per_ip(rate_limiter::GenericRateLimiterConfig {
149                limit: NonZeroU32::new(60).expect("60 > 0"),
150                interval: Duration::from_secs(60),
151            }),
152        ];
153
154        Config {
155            max_reservations: 128,
156            max_reservations_per_peer: 4,
157            reservation_duration: Duration::from_secs(60 * 60),
158            reservation_rate_limiters,
159
160            max_circuits: 16,
161            max_circuits_per_peer: 4,
162            max_circuit_duration: Duration::from_secs(2 * 60),
163            max_circuit_bytes: 1 << 17, // 128 kibibyte
164            circuit_src_rate_limiters,
165        }
166    }
167}
168
169/// The events produced by the relay `Behaviour`.
170#[derive(Debug)]
171pub enum Event {
172    /// An inbound reservation request has been accepted.
173    ReservationReqAccepted {
174        src_peer_id: PeerId,
175        /// Indicates whether the request replaces an existing reservation.
176        renewed: bool,
177    },
178    /// Accepting an inbound reservation request failed.
179    #[deprecated(
180        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
181    )]
182    ReservationReqAcceptFailed {
183        src_peer_id: PeerId,
184        error: inbound_hop::Error,
185    },
186    /// An inbound reservation request has been denied.
187    ReservationReqDenied { src_peer_id: PeerId },
188    /// Denying an inbound reservation request has failed.
189    #[deprecated(
190        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
191    )]
192    ReservationReqDenyFailed {
193        src_peer_id: PeerId,
194        error: inbound_hop::Error,
195    },
196    /// An inbound reservation has timed out.
197    ReservationTimedOut { src_peer_id: PeerId },
198    /// An inbound circuit request has been denied.
199    CircuitReqDenied {
200        src_peer_id: PeerId,
201        dst_peer_id: PeerId,
202    },
203    /// Denying an inbound circuit request failed.
204    #[deprecated(
205        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
206    )]
207    CircuitReqDenyFailed {
208        src_peer_id: PeerId,
209        dst_peer_id: PeerId,
210        error: inbound_hop::Error,
211    },
212    /// An inbound circuit request has been accepted.
213    CircuitReqAccepted {
214        src_peer_id: PeerId,
215        dst_peer_id: PeerId,
216    },
217    /// An outbound connect for an inbound circuit request failed.
218    #[deprecated(
219        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
220    )]
221    CircuitReqOutboundConnectFailed {
222        src_peer_id: PeerId,
223        dst_peer_id: PeerId,
224        error: outbound_stop::Error,
225    },
226    /// Accepting an inbound circuit request failed.
227    #[deprecated(
228        note = "Will be removed in favor of logging them internally, see <https://github.com/libp2p/rust-libp2p/issues/4757> for details."
229    )]
230    CircuitReqAcceptFailed {
231        src_peer_id: PeerId,
232        dst_peer_id: PeerId,
233        error: inbound_hop::Error,
234    },
235    /// An inbound circuit has closed.
236    CircuitClosed {
237        src_peer_id: PeerId,
238        dst_peer_id: PeerId,
239        error: Option<std::io::Error>,
240    },
241}
242
243/// [`NetworkBehaviour`] implementation of the relay server
244/// functionality of the circuit relay v2 protocol.
245pub struct Behaviour {
246    config: Config,
247
248    local_peer_id: PeerId,
249
250    reservations: HashMap<PeerId, HashSet<ConnectionId>>,
251    circuits: CircuitsTracker,
252
253    /// Queue of actions to return when polled.
254    queued_actions: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
255
256    external_addresses: ExternalAddresses,
257}
258
259impl Behaviour {
260    pub fn new(local_peer_id: PeerId, config: Config) -> Self {
261        Self {
262            config,
263            local_peer_id,
264            reservations: Default::default(),
265            circuits: Default::default(),
266            queued_actions: Default::default(),
267            external_addresses: Default::default(),
268        }
269    }
270
271    fn on_connection_closed(
272        &mut self,
273        ConnectionClosed {
274            peer_id,
275            connection_id,
276            ..
277        }: ConnectionClosed,
278    ) {
279        if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) {
280            peer.get_mut().remove(&connection_id);
281            if peer.get().is_empty() {
282                peer.remove();
283            }
284        }
285
286        for circuit in self
287            .circuits
288            .remove_by_connection(peer_id, connection_id)
289            .iter()
290            // Only emit [`CircuitClosed`] for accepted requests.
291            .filter(|c| matches!(c.status, CircuitStatus::Accepted))
292        {
293            self.queued_actions
294                .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
295                    src_peer_id: circuit.src_peer_id,
296                    dst_peer_id: circuit.dst_peer_id,
297                    error: Some(std::io::ErrorKind::ConnectionAborted.into()),
298                }));
299        }
300    }
301}
302
303impl NetworkBehaviour for Behaviour {
304    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
305    type ToSwarm = Event;
306
307    fn handle_established_inbound_connection(
308        &mut self,
309        _: ConnectionId,
310        _: PeerId,
311        local_addr: &Multiaddr,
312        remote_addr: &Multiaddr,
313    ) -> Result<THandler<Self>, ConnectionDenied> {
314        if local_addr.is_relayed() {
315            // Deny all substreams on relayed connection.
316            return Ok(Either::Right(dummy::ConnectionHandler));
317        }
318
319        Ok(Either::Left(Handler::new(
320            handler::Config {
321                reservation_duration: self.config.reservation_duration,
322                max_circuit_duration: self.config.max_circuit_duration,
323                max_circuit_bytes: self.config.max_circuit_bytes,
324            },
325            ConnectedPoint::Listener {
326                local_addr: local_addr.clone(),
327                send_back_addr: remote_addr.clone(),
328            },
329        )))
330    }
331
332    fn handle_established_outbound_connection(
333        &mut self,
334        _: ConnectionId,
335        _: PeerId,
336        addr: &Multiaddr,
337        role_override: Endpoint,
338        port_use: PortUse,
339    ) -> Result<THandler<Self>, ConnectionDenied> {
340        if addr.is_relayed() {
341            // Deny all substreams on relayed connection.
342            return Ok(Either::Right(dummy::ConnectionHandler));
343        }
344
345        Ok(Either::Left(Handler::new(
346            handler::Config {
347                reservation_duration: self.config.reservation_duration,
348                max_circuit_duration: self.config.max_circuit_duration,
349                max_circuit_bytes: self.config.max_circuit_bytes,
350            },
351            ConnectedPoint::Dialer {
352                address: addr.clone(),
353                role_override,
354                port_use,
355            },
356        )))
357    }
358
359    fn on_swarm_event(&mut self, event: FromSwarm) {
360        self.external_addresses.on_swarm_event(&event);
361
362        if let FromSwarm::ConnectionClosed(connection_closed) = event {
363            self.on_connection_closed(connection_closed)
364        }
365    }
366
367    fn on_connection_handler_event(
368        &mut self,
369        event_source: PeerId,
370        connection: ConnectionId,
371        event: THandlerOutEvent<Self>,
372    ) {
373        let event = match event {
374            Either::Left(e) => e,
375            // TODO: remove when Rust 1.82 is MSRV
376            #[allow(unreachable_patterns)]
377            Either::Right(v) => libp2p_core::util::unreachable(v),
378        };
379
380        match event {
381            handler::Event::ReservationReqReceived {
382                inbound_reservation_req,
383                endpoint,
384                renewed,
385            } => {
386                let now = Instant::now();
387
388                assert!(
389                    !endpoint.is_relayed(),
390                    "`dummy::ConnectionHandler` handles relayed connections. It \
391                     denies all inbound substreams."
392                );
393
394                let action = if
395                // Deny if it is a new reservation and exceeds
396                // `max_reservations_per_peer`.
397                (!renewed
398                    && self
399                        .reservations
400                        .get(&event_source)
401                        .map(|cs| cs.len())
402                        .unwrap_or(0)
403                        > self.config.max_reservations_per_peer)
404                    // Deny if it exceeds `max_reservations`.
405                    || self
406                        .reservations
407                        .values()
408                        .map(|cs| cs.len())
409                        .sum::<usize>()
410                        >= self.config.max_reservations
411                    // Deny if it exceeds the allowed rate of reservations.
412                    || !self
413                        .config
414                        .reservation_rate_limiters
415                        .iter_mut()
416                        .all(|limiter| {
417                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
418                        }) {
419                    ToSwarm::NotifyHandler {
420                        handler: NotifyHandler::One(connection),
421                        peer_id: event_source,
422                        event: Either::Left(handler::In::DenyReservationReq {
423                            inbound_reservation_req,
424                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
425                        }),
426                    }
427                } else {
428                    // Accept reservation.
429                    self.reservations
430                        .entry(event_source)
431                        .or_default()
432                        .insert(connection);
433
434                    ToSwarm::NotifyHandler {
435                        handler: NotifyHandler::One(connection),
436                        peer_id: event_source,
437                        event: Either::Left(handler::In::AcceptReservationReq {
438                            inbound_reservation_req,
439                            addrs: self
440                                .external_addresses
441                                .iter()
442                                .cloned()
443                                // Add local peer ID in case it isn't present yet.
444                                .filter_map(|a| match a.iter().last()? {
445                                    Protocol::P2p(_) => Some(a),
446                                    _ => Some(a.with(Protocol::P2p(self.local_peer_id))),
447                                })
448                                .collect(),
449                        }),
450                    }
451                };
452
453                self.queued_actions.push_back(action);
454            }
455            handler::Event::ReservationReqAccepted { renewed } => {
456                // Ensure local eventual consistent reservation state matches handler (source of
457                // truth).
458                self.reservations
459                    .entry(event_source)
460                    .or_default()
461                    .insert(connection);
462
463                self.queued_actions.push_back(ToSwarm::GenerateEvent(
464                    Event::ReservationReqAccepted {
465                        src_peer_id: event_source,
466                        renewed,
467                    },
468                ));
469            }
470            handler::Event::ReservationReqAcceptFailed { error } => {
471                #[allow(deprecated)]
472                self.queued_actions.push_back(ToSwarm::GenerateEvent(
473                    Event::ReservationReqAcceptFailed {
474                        src_peer_id: event_source,
475                        error,
476                    },
477                ));
478            }
479            handler::Event::ReservationReqDenied {} => {
480                self.queued_actions.push_back(ToSwarm::GenerateEvent(
481                    Event::ReservationReqDenied {
482                        src_peer_id: event_source,
483                    },
484                ));
485            }
486            handler::Event::ReservationReqDenyFailed { error } => {
487                #[allow(deprecated)]
488                self.queued_actions.push_back(ToSwarm::GenerateEvent(
489                    Event::ReservationReqDenyFailed {
490                        src_peer_id: event_source,
491                        error,
492                    },
493                ));
494            }
495            handler::Event::ReservationTimedOut {} => {
496                match self.reservations.entry(event_source) {
497                    hash_map::Entry::Occupied(mut peer) => {
498                        peer.get_mut().remove(&connection);
499                        if peer.get().is_empty() {
500                            peer.remove();
501                        }
502                    }
503                    hash_map::Entry::Vacant(_) => {
504                        unreachable!(
505                            "Expect to track timed out reservation with peer {:?} on connection {:?}",
506                            event_source,
507                            connection,
508                        );
509                    }
510                }
511
512                self.queued_actions
513                    .push_back(ToSwarm::GenerateEvent(Event::ReservationTimedOut {
514                        src_peer_id: event_source,
515                    }));
516            }
517            handler::Event::CircuitReqReceived {
518                inbound_circuit_req,
519                endpoint,
520            } => {
521                let now = Instant::now();
522
523                assert!(
524                    !endpoint.is_relayed(),
525                    "`dummy::ConnectionHandler` handles relayed connections. It \
526                     denies all inbound substreams."
527                );
528
529                let action = if self.circuits.num_circuits_of_peer(event_source)
530                    > self.config.max_circuits_per_peer
531                    || self.circuits.len() >= self.config.max_circuits
532                    || !self
533                        .config
534                        .circuit_src_rate_limiters
535                        .iter_mut()
536                        .all(|limiter| {
537                            limiter.try_next(event_source, endpoint.get_remote_address(), now)
538                        }) {
539                    // Deny circuit exceeding limits.
540                    ToSwarm::NotifyHandler {
541                        handler: NotifyHandler::One(connection),
542                        peer_id: event_source,
543                        event: Either::Left(handler::In::DenyCircuitReq {
544                            circuit_id: None,
545                            inbound_circuit_req,
546                            status: proto::Status::RESOURCE_LIMIT_EXCEEDED,
547                        }),
548                    }
549                } else if let Some(dst_conn) = self
550                    .reservations
551                    .get(&inbound_circuit_req.dst())
552                    .and_then(|cs| cs.iter().next())
553                {
554                    // Accept circuit request if reservation present.
555                    let circuit_id = self.circuits.insert(Circuit {
556                        status: CircuitStatus::Accepting,
557                        src_peer_id: event_source,
558                        src_connection_id: connection,
559                        dst_peer_id: inbound_circuit_req.dst(),
560                        dst_connection_id: *dst_conn,
561                    });
562
563                    ToSwarm::NotifyHandler {
564                        handler: NotifyHandler::One(*dst_conn),
565                        peer_id: event_source,
566                        event: Either::Left(handler::In::NegotiateOutboundConnect {
567                            circuit_id,
568                            inbound_circuit_req,
569                            src_peer_id: event_source,
570                            src_connection_id: connection,
571                        }),
572                    }
573                } else {
574                    // Deny circuit request if no reservation present.
575                    ToSwarm::NotifyHandler {
576                        handler: NotifyHandler::One(connection),
577                        peer_id: event_source,
578                        event: Either::Left(handler::In::DenyCircuitReq {
579                            circuit_id: None,
580                            inbound_circuit_req,
581                            status: proto::Status::NO_RESERVATION,
582                        }),
583                    }
584                };
585                self.queued_actions.push_back(action);
586            }
587            handler::Event::CircuitReqDenied {
588                circuit_id,
589                dst_peer_id,
590            } => {
591                if let Some(circuit_id) = circuit_id {
592                    self.circuits.remove(circuit_id);
593                }
594
595                self.queued_actions
596                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqDenied {
597                        src_peer_id: event_source,
598                        dst_peer_id,
599                    }));
600            }
601            handler::Event::CircuitReqDenyFailed {
602                circuit_id,
603                dst_peer_id,
604                error,
605            } => {
606                if let Some(circuit_id) = circuit_id {
607                    self.circuits.remove(circuit_id);
608                }
609
610                #[allow(deprecated)]
611                self.queued_actions.push_back(ToSwarm::GenerateEvent(
612                    Event::CircuitReqDenyFailed {
613                        src_peer_id: event_source,
614                        dst_peer_id,
615                        error,
616                    },
617                ));
618            }
619            handler::Event::OutboundConnectNegotiated {
620                circuit_id,
621                src_peer_id,
622                src_connection_id,
623                inbound_circuit_req,
624                dst_stream,
625                dst_pending_data,
626            } => {
627                self.queued_actions.push_back(ToSwarm::NotifyHandler {
628                    handler: NotifyHandler::One(src_connection_id),
629                    peer_id: src_peer_id,
630                    event: Either::Left(handler::In::AcceptAndDriveCircuit {
631                        circuit_id,
632                        dst_peer_id: event_source,
633                        inbound_circuit_req,
634                        dst_stream,
635                        dst_pending_data,
636                    }),
637                });
638            }
639            handler::Event::OutboundConnectNegotiationFailed {
640                circuit_id,
641                src_peer_id,
642                src_connection_id,
643                inbound_circuit_req,
644                status,
645                error,
646            } => {
647                self.queued_actions.push_back(ToSwarm::NotifyHandler {
648                    handler: NotifyHandler::One(src_connection_id),
649                    peer_id: src_peer_id,
650                    event: Either::Left(handler::In::DenyCircuitReq {
651                        circuit_id: Some(circuit_id),
652                        inbound_circuit_req,
653                        status,
654                    }),
655                });
656                #[allow(deprecated)]
657                self.queued_actions.push_back(ToSwarm::GenerateEvent(
658                    Event::CircuitReqOutboundConnectFailed {
659                        src_peer_id,
660                        dst_peer_id: event_source,
661                        error,
662                    },
663                ));
664            }
665            handler::Event::CircuitReqAccepted {
666                dst_peer_id,
667                circuit_id,
668            } => {
669                self.circuits.accepted(circuit_id);
670                self.queued_actions
671                    .push_back(ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
672                        src_peer_id: event_source,
673                        dst_peer_id,
674                    }));
675            }
676            handler::Event::CircuitReqAcceptFailed {
677                dst_peer_id,
678                circuit_id,
679                error,
680            } => {
681                self.circuits.remove(circuit_id);
682                #[allow(deprecated)]
683                self.queued_actions.push_back(ToSwarm::GenerateEvent(
684                    Event::CircuitReqAcceptFailed {
685                        src_peer_id: event_source,
686                        dst_peer_id,
687                        error,
688                    },
689                ));
690            }
691            handler::Event::CircuitClosed {
692                dst_peer_id,
693                circuit_id,
694                error,
695            } => {
696                self.circuits.remove(circuit_id);
697
698                self.queued_actions
699                    .push_back(ToSwarm::GenerateEvent(Event::CircuitClosed {
700                        src_peer_id: event_source,
701                        dst_peer_id,
702                        error,
703                    }));
704            }
705        }
706    }
707
708    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
709    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
710        if let Some(to_swarm) = self.queued_actions.pop_front() {
711            return Poll::Ready(to_swarm);
712        }
713
714        Poll::Pending
715    }
716}
717
718#[derive(Default)]
719struct CircuitsTracker {
720    next_id: CircuitId,
721    circuits: HashMap<CircuitId, Circuit>,
722}
723
724impl CircuitsTracker {
725    fn len(&self) -> usize {
726        self.circuits.len()
727    }
728
729    fn insert(&mut self, circuit: Circuit) -> CircuitId {
730        let id = self.next_id;
731        self.next_id = self.next_id + 1;
732
733        self.circuits.insert(id, circuit);
734
735        id
736    }
737
738    fn accepted(&mut self, circuit_id: CircuitId) {
739        if let Some(c) = self.circuits.get_mut(&circuit_id) {
740            c.status = CircuitStatus::Accepted;
741        };
742    }
743
744    fn remove(&mut self, circuit_id: CircuitId) -> Option<Circuit> {
745        self.circuits.remove(&circuit_id)
746    }
747
748    fn remove_by_connection(
749        &mut self,
750        peer_id: PeerId,
751        connection_id: ConnectionId,
752    ) -> Vec<Circuit> {
753        let mut removed = vec![];
754
755        self.circuits.retain(|_circuit_id, circuit| {
756            let is_src =
757                circuit.src_peer_id == peer_id && circuit.src_connection_id == connection_id;
758            let is_dst =
759                circuit.dst_peer_id == peer_id && circuit.dst_connection_id == connection_id;
760
761            if is_src || is_dst {
762                removed.push(circuit.clone());
763                // Remove circuit from HashMap.
764                false
765            } else {
766                // Retain circuit in HashMap.
767                true
768            }
769        });
770
771        removed
772    }
773
774    fn num_circuits_of_peer(&self, peer: PeerId) -> usize {
775        self.circuits
776            .iter()
777            .filter(|(_, c)| c.src_peer_id == peer || c.dst_peer_id == peer)
778            .count()
779    }
780}
781
782#[derive(Clone)]
783struct Circuit {
784    src_peer_id: PeerId,
785    src_connection_id: ConnectionId,
786    dst_peer_id: PeerId,
787    dst_connection_id: ConnectionId,
788    status: CircuitStatus,
789}
790
791#[derive(Clone)]
792enum CircuitStatus {
793    Accepting,
794    Accepted,
795}
796
797#[derive(Default, Clone, Copy, Debug, Hash, Eq, PartialEq)]
798pub struct CircuitId(u64);
799
800impl Add<u64> for CircuitId {
801    type Output = CircuitId;
802
803    fn add(self, rhs: u64) -> Self {
804        CircuitId(self.0 + rhs)
805    }
806}