libp2p_relay/
priv_client.rs

1// Copyright 2021 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`NetworkBehaviour`] to act as a circuit relay v2 **client**.
22
23pub(crate) mod handler;
24pub(crate) mod transport;
25
26use std::{
27    collections::{hash_map, HashMap, VecDeque},
28    convert::Infallible,
29    io::{Error, ErrorKind, IoSlice},
30    pin::Pin,
31    task::{Context, Poll},
32};
33
34use bytes::Bytes;
35use either::Either;
36use futures::{
37    channel::mpsc::Receiver,
38    future::{BoxFuture, FutureExt},
39    io::{AsyncRead, AsyncWrite},
40    ready,
41    stream::StreamExt,
42};
43use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr};
44use libp2p_identity::PeerId;
45use libp2p_swarm::{
46    behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
47    dial_opts::DialOpts,
48    dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
49    NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
50};
51use transport::Transport;
52
53use crate::{
54    multiaddr_ext::MultiaddrExt,
55    priv_client::handler::Handler,
56    protocol::{self, inbound_stop},
57};
58
59/// The events produced by the client `Behaviour`.
60#[derive(Debug)]
61pub enum Event {
62    /// An outbound reservation has been accepted.
63    ReservationReqAccepted {
64        relay_peer_id: PeerId,
65        /// Indicates whether the request replaces an existing reservation.
66        renewal: bool,
67        limit: Option<protocol::Limit>,
68    },
69    OutboundCircuitEstablished {
70        relay_peer_id: PeerId,
71        limit: Option<protocol::Limit>,
72    },
73    /// An inbound circuit has been established.
74    InboundCircuitEstablished {
75        src_peer_id: PeerId,
76        limit: Option<protocol::Limit>,
77    },
78}
79
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81enum ReservationStatus {
82    Pending,
83    Confirmed,
84}
85
86/// [`NetworkBehaviour`] implementation of the relay client
87/// functionality of the circuit relay v2 protocol.
88pub struct Behaviour {
89    local_peer_id: PeerId,
90
91    from_transport: Receiver<transport::TransportToBehaviourMsg>,
92    /// Set of directly connected peers, i.e. not connected via a relayed
93    /// connection.
94    directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
95
96    /// Stores the address of a pending or confirmed reservation.
97    ///
98    /// This is indexed by the [`ConnectionId`] to a relay server and the address is the
99    /// `/p2p-circuit` address we reserved on it.
100    reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,
101
102    /// Queue of actions to return when polled.
103    queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,
104
105    pending_handler_commands: HashMap<ConnectionId, handler::In>,
106}
107
108/// Create a new client relay [`Behaviour`] with it's corresponding [`Transport`].
109pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
110    let (transport, from_transport) = Transport::new();
111    let behaviour = Behaviour {
112        local_peer_id,
113        from_transport,
114        directly_connected_peers: Default::default(),
115        reservation_addresses: Default::default(),
116        queued_actions: Default::default(),
117        pending_handler_commands: Default::default(),
118    };
119    (transport, behaviour)
120}
121
122impl Behaviour {
123    fn on_connection_closed(
124        &mut self,
125        ConnectionClosed {
126            peer_id,
127            connection_id,
128            endpoint,
129            ..
130        }: ConnectionClosed,
131    ) {
132        if !endpoint.is_relayed() {
133            match self.directly_connected_peers.entry(peer_id) {
134                hash_map::Entry::Occupied(mut connections) => {
135                    let position = connections
136                        .get()
137                        .iter()
138                        .position(|c| c == &connection_id)
139                        .expect("Connection to be known.");
140                    connections.get_mut().remove(position);
141
142                    if connections.get().is_empty() {
143                        connections.remove();
144                    }
145                }
146                hash_map::Entry::Vacant(_) => {
147                    unreachable!("`on_connection_closed` for unconnected peer.")
148                }
149            };
150            if let Some((addr, ReservationStatus::Confirmed)) =
151                self.reservation_addresses.remove(&connection_id)
152            {
153                self.queued_actions
154                    .push_back(ToSwarm::ExternalAddrExpired(addr));
155            }
156        }
157    }
158}
159
160impl NetworkBehaviour for Behaviour {
161    type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
162    type ToSwarm = Event;
163
164    fn handle_established_inbound_connection(
165        &mut self,
166        connection_id: ConnectionId,
167        peer: PeerId,
168        local_addr: &Multiaddr,
169        remote_addr: &Multiaddr,
170    ) -> Result<THandler<Self>, ConnectionDenied> {
171        if local_addr.is_relayed() {
172            return Ok(Either::Right(dummy::ConnectionHandler));
173        }
174        let mut handler = Handler::new(self.local_peer_id, peer, remote_addr.clone());
175
176        if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
177            handler.on_behaviour_event(event)
178        }
179
180        Ok(Either::Left(handler))
181    }
182
183    fn handle_established_outbound_connection(
184        &mut self,
185        connection_id: ConnectionId,
186        peer: PeerId,
187        addr: &Multiaddr,
188        _: Endpoint,
189        _: PortUse,
190    ) -> Result<THandler<Self>, ConnectionDenied> {
191        if addr.is_relayed() {
192            return Ok(Either::Right(dummy::ConnectionHandler));
193        }
194
195        let mut handler = Handler::new(self.local_peer_id, peer, addr.clone());
196
197        if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
198            handler.on_behaviour_event(event)
199        }
200
201        Ok(Either::Left(handler))
202    }
203
204    fn on_swarm_event(&mut self, event: FromSwarm) {
205        match event {
206            FromSwarm::ConnectionEstablished(ConnectionEstablished {
207                peer_id,
208                connection_id,
209                endpoint,
210                ..
211            }) => {
212                if !endpoint.is_relayed() {
213                    self.directly_connected_peers
214                        .entry(peer_id)
215                        .or_default()
216                        .push(connection_id);
217                }
218
219                if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
220                    self.queued_actions.push_back(ToSwarm::NotifyHandler {
221                        peer_id,
222                        handler: NotifyHandler::One(connection_id),
223                        event: Either::Left(event),
224                    })
225                }
226            }
227            FromSwarm::ConnectionClosed(connection_closed) => {
228                self.on_connection_closed(connection_closed)
229            }
230            FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
231                self.reservation_addresses.remove(&connection_id);
232                self.pending_handler_commands.remove(&connection_id);
233            }
234            _ => {}
235        }
236    }
237
238    fn on_connection_handler_event(
239        &mut self,
240        event_source: PeerId,
241        connection: ConnectionId,
242        handler_event: THandlerOutEvent<Self>,
243    ) {
244        let handler_event = match handler_event {
245            Either::Left(e) => e,
246            // TODO: remove when Rust 1.82 is MSRV
247            #[allow(unreachable_patterns)]
248            Either::Right(v) => libp2p_core::util::unreachable(v),
249        };
250
251        let event = match handler_event {
252            handler::Event::ReservationReqAccepted { renewal, limit } => {
253                let (addr, status) = self
254                    .reservation_addresses
255                    .get_mut(&connection)
256                    .expect("Relay connection exist");
257
258                if !renewal && *status == ReservationStatus::Pending {
259                    *status = ReservationStatus::Confirmed;
260                    self.queued_actions
261                        .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
262                }
263
264                Event::ReservationReqAccepted {
265                    relay_peer_id: event_source,
266                    renewal,
267                    limit,
268                }
269            }
270            handler::Event::OutboundCircuitEstablished { limit } => {
271                Event::OutboundCircuitEstablished {
272                    relay_peer_id: event_source,
273                    limit,
274                }
275            }
276            handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
277                Event::InboundCircuitEstablished { src_peer_id, limit }
278            }
279        };
280
281        self.queued_actions.push_back(ToSwarm::GenerateEvent(event));
282    }
283
284    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
285    fn poll(
286        &mut self,
287        cx: &mut Context<'_>,
288    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
289        if let Some(action) = self.queued_actions.pop_front() {
290            return Poll::Ready(action);
291        }
292
293        let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
294            Some(transport::TransportToBehaviourMsg::ListenReq {
295                relay_peer_id,
296                relay_addr,
297                to_listener,
298            }) => {
299                match self
300                    .directly_connected_peers
301                    .get(&relay_peer_id)
302                    .and_then(|cs| cs.first())
303                {
304                    Some(connection_id) => {
305                        self.reservation_addresses.insert(
306                            *connection_id,
307                            (
308                                relay_addr
309                                    .with(Protocol::P2p(relay_peer_id))
310                                    .with(Protocol::P2pCircuit)
311                                    .with(Protocol::P2p(self.local_peer_id)),
312                                ReservationStatus::Pending,
313                            ),
314                        );
315
316                        ToSwarm::NotifyHandler {
317                            peer_id: relay_peer_id,
318                            handler: NotifyHandler::One(*connection_id),
319                            event: Either::Left(handler::In::Reserve { to_listener }),
320                        }
321                    }
322                    None => {
323                        let opts = DialOpts::peer_id(relay_peer_id)
324                            .addresses(vec![relay_addr.clone()])
325                            .extend_addresses_through_behaviour()
326                            .build();
327                        let relayed_connection_id = opts.connection_id();
328
329                        self.reservation_addresses.insert(
330                            relayed_connection_id,
331                            (
332                                relay_addr
333                                    .with(Protocol::P2p(relay_peer_id))
334                                    .with(Protocol::P2pCircuit)
335                                    .with(Protocol::P2p(self.local_peer_id)),
336                                ReservationStatus::Pending,
337                            ),
338                        );
339
340                        self.pending_handler_commands
341                            .insert(relayed_connection_id, handler::In::Reserve { to_listener });
342                        ToSwarm::Dial { opts }
343                    }
344                }
345            }
346            Some(transport::TransportToBehaviourMsg::DialReq {
347                relay_addr,
348                relay_peer_id,
349                dst_peer_id,
350                send_back,
351                ..
352            }) => {
353                match self
354                    .directly_connected_peers
355                    .get(&relay_peer_id)
356                    .and_then(|cs| cs.first())
357                {
358                    Some(connection_id) => ToSwarm::NotifyHandler {
359                        peer_id: relay_peer_id,
360                        handler: NotifyHandler::One(*connection_id),
361                        event: Either::Left(handler::In::EstablishCircuit {
362                            to_dial: send_back,
363                            dst_peer_id,
364                        }),
365                    },
366                    None => {
367                        let opts = DialOpts::peer_id(relay_peer_id)
368                            .addresses(vec![relay_addr])
369                            .extend_addresses_through_behaviour()
370                            .build();
371                        let connection_id = opts.connection_id();
372
373                        self.pending_handler_commands.insert(
374                            connection_id,
375                            handler::In::EstablishCircuit {
376                                to_dial: send_back,
377                                dst_peer_id,
378                            },
379                        );
380
381                        ToSwarm::Dial { opts }
382                    }
383                }
384            }
385            None => unreachable!(
386                "`relay::Behaviour` polled after channel from \
387                     `Transport` has been closed. Unreachable under \
388                     the assumption that the `client::Behaviour` is never polled after \
389                     `client::Transport` is dropped.",
390            ),
391        };
392
393        Poll::Ready(action)
394    }
395}
396
397/// Represents a connection to another peer via a relay.
398///
399/// Internally, this uses a stream to the relay.
400pub struct Connection {
401    pub(crate) state: ConnectionState,
402}
403
404pub(crate) enum ConnectionState {
405    InboundAccepting {
406        accept: BoxFuture<'static, Result<ConnectionState, Error>>,
407    },
408    Operational {
409        read_buffer: Bytes,
410        substream: Stream,
411    },
412}
413
414impl Unpin for ConnectionState {}
415
416impl ConnectionState {
417    pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self {
418        ConnectionState::InboundAccepting {
419            accept: async {
420                let (substream, read_buffer) = circuit
421                    .accept()
422                    .await
423                    .map_err(|e| Error::new(ErrorKind::Other, e))?;
424                Ok(ConnectionState::Operational {
425                    read_buffer,
426                    substream,
427                })
428            }
429            .boxed(),
430        }
431    }
432
433    pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self {
434        ConnectionState::Operational {
435            substream,
436            read_buffer,
437        }
438    }
439}
440
441impl AsyncWrite for Connection {
442    fn poll_write(
443        mut self: Pin<&mut Self>,
444        cx: &mut Context,
445        buf: &[u8],
446    ) -> Poll<Result<usize, Error>> {
447        loop {
448            match &mut self.state {
449                ConnectionState::InboundAccepting { accept } => {
450                    *self = Connection {
451                        state: ready!(accept.poll_unpin(cx))?,
452                    };
453                }
454                ConnectionState::Operational { substream, .. } => {
455                    return Pin::new(substream).poll_write(cx, buf);
456                }
457            }
458        }
459    }
460    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
461        loop {
462            match &mut self.state {
463                ConnectionState::InboundAccepting { accept } => {
464                    *self = Connection {
465                        state: ready!(accept.poll_unpin(cx))?,
466                    };
467                }
468                ConnectionState::Operational { substream, .. } => {
469                    return Pin::new(substream).poll_flush(cx);
470                }
471            }
472        }
473    }
474    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
475        loop {
476            match &mut self.state {
477                ConnectionState::InboundAccepting { accept } => {
478                    *self = Connection {
479                        state: ready!(accept.poll_unpin(cx))?,
480                    };
481                }
482                ConnectionState::Operational { substream, .. } => {
483                    return Pin::new(substream).poll_close(cx);
484                }
485            }
486        }
487    }
488
489    fn poll_write_vectored(
490        mut self: Pin<&mut Self>,
491        cx: &mut Context,
492        bufs: &[IoSlice],
493    ) -> Poll<Result<usize, Error>> {
494        loop {
495            match &mut self.state {
496                ConnectionState::InboundAccepting { accept } => {
497                    *self = Connection {
498                        state: ready!(accept.poll_unpin(cx))?,
499                    };
500                }
501                ConnectionState::Operational { substream, .. } => {
502                    return Pin::new(substream).poll_write_vectored(cx, bufs);
503                }
504            }
505        }
506    }
507}
508
509impl AsyncRead for Connection {
510    fn poll_read(
511        mut self: Pin<&mut Self>,
512        cx: &mut Context<'_>,
513        buf: &mut [u8],
514    ) -> Poll<Result<usize, Error>> {
515        loop {
516            match &mut self.state {
517                ConnectionState::InboundAccepting { accept } => {
518                    *self = Connection {
519                        state: ready!(accept.poll_unpin(cx))?,
520                    };
521                }
522                ConnectionState::Operational {
523                    read_buffer,
524                    substream,
525                    ..
526                } => {
527                    if !read_buffer.is_empty() {
528                        let n = std::cmp::min(read_buffer.len(), buf.len());
529                        let data = read_buffer.split_to(n);
530                        buf[0..n].copy_from_slice(&data[..]);
531                        return Poll::Ready(Ok(n));
532                    }
533
534                    return Pin::new(substream).poll_read(cx, buf);
535                }
536            }
537        }
538    }
539}