1pub(crate) mod handler;
24pub(crate) mod transport;
25
26use std::{
27 collections::{hash_map, HashMap, VecDeque},
28 convert::Infallible,
29 io::{Error, ErrorKind, IoSlice},
30 pin::Pin,
31 task::{Context, Poll},
32};
33
34use bytes::Bytes;
35use either::Either;
36use futures::{
37 channel::mpsc::Receiver,
38 future::{BoxFuture, FutureExt},
39 io::{AsyncRead, AsyncWrite},
40 ready,
41 stream::StreamExt,
42};
43use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr};
44use libp2p_identity::PeerId;
45use libp2p_swarm::{
46 behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
47 dial_opts::DialOpts,
48 dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
49 NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
50};
51use transport::Transport;
52
53use crate::{
54 multiaddr_ext::MultiaddrExt,
55 priv_client::handler::Handler,
56 protocol::{self, inbound_stop},
57};
58
59#[derive(Debug)]
61pub enum Event {
62 ReservationReqAccepted {
64 relay_peer_id: PeerId,
65 renewal: bool,
67 limit: Option<protocol::Limit>,
68 },
69 OutboundCircuitEstablished {
70 relay_peer_id: PeerId,
71 limit: Option<protocol::Limit>,
72 },
73 InboundCircuitEstablished {
75 src_peer_id: PeerId,
76 limit: Option<protocol::Limit>,
77 },
78}
79
80#[derive(Debug, Copy, Clone, PartialEq, Eq)]
81enum ReservationStatus {
82 Pending,
83 Confirmed,
84}
85
86pub struct Behaviour {
89 local_peer_id: PeerId,
90
91 from_transport: Receiver<transport::TransportToBehaviourMsg>,
92 directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
95
96 reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,
101
102 queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,
104
105 pending_handler_commands: HashMap<ConnectionId, handler::In>,
106}
107
108pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
110 let (transport, from_transport) = Transport::new();
111 let behaviour = Behaviour {
112 local_peer_id,
113 from_transport,
114 directly_connected_peers: Default::default(),
115 reservation_addresses: Default::default(),
116 queued_actions: Default::default(),
117 pending_handler_commands: Default::default(),
118 };
119 (transport, behaviour)
120}
121
122impl Behaviour {
123 fn on_connection_closed(
124 &mut self,
125 ConnectionClosed {
126 peer_id,
127 connection_id,
128 endpoint,
129 ..
130 }: ConnectionClosed,
131 ) {
132 if !endpoint.is_relayed() {
133 match self.directly_connected_peers.entry(peer_id) {
134 hash_map::Entry::Occupied(mut connections) => {
135 let position = connections
136 .get()
137 .iter()
138 .position(|c| c == &connection_id)
139 .expect("Connection to be known.");
140 connections.get_mut().remove(position);
141
142 if connections.get().is_empty() {
143 connections.remove();
144 }
145 }
146 hash_map::Entry::Vacant(_) => {
147 unreachable!("`on_connection_closed` for unconnected peer.")
148 }
149 };
150 if let Some((addr, ReservationStatus::Confirmed)) =
151 self.reservation_addresses.remove(&connection_id)
152 {
153 self.queued_actions
154 .push_back(ToSwarm::ExternalAddrExpired(addr));
155 }
156 }
157 }
158}
159
160impl NetworkBehaviour for Behaviour {
161 type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
162 type ToSwarm = Event;
163
164 fn handle_established_inbound_connection(
165 &mut self,
166 connection_id: ConnectionId,
167 peer: PeerId,
168 local_addr: &Multiaddr,
169 remote_addr: &Multiaddr,
170 ) -> Result<THandler<Self>, ConnectionDenied> {
171 if local_addr.is_relayed() {
172 return Ok(Either::Right(dummy::ConnectionHandler));
173 }
174 let mut handler = Handler::new(self.local_peer_id, peer, remote_addr.clone());
175
176 if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
177 handler.on_behaviour_event(event)
178 }
179
180 Ok(Either::Left(handler))
181 }
182
183 fn handle_established_outbound_connection(
184 &mut self,
185 connection_id: ConnectionId,
186 peer: PeerId,
187 addr: &Multiaddr,
188 _: Endpoint,
189 _: PortUse,
190 ) -> Result<THandler<Self>, ConnectionDenied> {
191 if addr.is_relayed() {
192 return Ok(Either::Right(dummy::ConnectionHandler));
193 }
194
195 let mut handler = Handler::new(self.local_peer_id, peer, addr.clone());
196
197 if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
198 handler.on_behaviour_event(event)
199 }
200
201 Ok(Either::Left(handler))
202 }
203
204 fn on_swarm_event(&mut self, event: FromSwarm) {
205 match event {
206 FromSwarm::ConnectionEstablished(ConnectionEstablished {
207 peer_id,
208 connection_id,
209 endpoint,
210 ..
211 }) => {
212 if !endpoint.is_relayed() {
213 self.directly_connected_peers
214 .entry(peer_id)
215 .or_default()
216 .push(connection_id);
217 }
218
219 if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
220 self.queued_actions.push_back(ToSwarm::NotifyHandler {
221 peer_id,
222 handler: NotifyHandler::One(connection_id),
223 event: Either::Left(event),
224 })
225 }
226 }
227 FromSwarm::ConnectionClosed(connection_closed) => {
228 self.on_connection_closed(connection_closed)
229 }
230 FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
231 self.reservation_addresses.remove(&connection_id);
232 self.pending_handler_commands.remove(&connection_id);
233 }
234 _ => {}
235 }
236 }
237
238 fn on_connection_handler_event(
239 &mut self,
240 event_source: PeerId,
241 connection: ConnectionId,
242 handler_event: THandlerOutEvent<Self>,
243 ) {
244 let handler_event = match handler_event {
245 Either::Left(e) => e,
246 #[allow(unreachable_patterns)]
248 Either::Right(v) => libp2p_core::util::unreachable(v),
249 };
250
251 let event = match handler_event {
252 handler::Event::ReservationReqAccepted { renewal, limit } => {
253 let (addr, status) = self
254 .reservation_addresses
255 .get_mut(&connection)
256 .expect("Relay connection exist");
257
258 if !renewal && *status == ReservationStatus::Pending {
259 *status = ReservationStatus::Confirmed;
260 self.queued_actions
261 .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
262 }
263
264 Event::ReservationReqAccepted {
265 relay_peer_id: event_source,
266 renewal,
267 limit,
268 }
269 }
270 handler::Event::OutboundCircuitEstablished { limit } => {
271 Event::OutboundCircuitEstablished {
272 relay_peer_id: event_source,
273 limit,
274 }
275 }
276 handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
277 Event::InboundCircuitEstablished { src_peer_id, limit }
278 }
279 };
280
281 self.queued_actions.push_back(ToSwarm::GenerateEvent(event));
282 }
283
284 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
285 fn poll(
286 &mut self,
287 cx: &mut Context<'_>,
288 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
289 if let Some(action) = self.queued_actions.pop_front() {
290 return Poll::Ready(action);
291 }
292
293 let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
294 Some(transport::TransportToBehaviourMsg::ListenReq {
295 relay_peer_id,
296 relay_addr,
297 to_listener,
298 }) => {
299 match self
300 .directly_connected_peers
301 .get(&relay_peer_id)
302 .and_then(|cs| cs.first())
303 {
304 Some(connection_id) => {
305 self.reservation_addresses.insert(
306 *connection_id,
307 (
308 relay_addr
309 .with(Protocol::P2p(relay_peer_id))
310 .with(Protocol::P2pCircuit)
311 .with(Protocol::P2p(self.local_peer_id)),
312 ReservationStatus::Pending,
313 ),
314 );
315
316 ToSwarm::NotifyHandler {
317 peer_id: relay_peer_id,
318 handler: NotifyHandler::One(*connection_id),
319 event: Either::Left(handler::In::Reserve { to_listener }),
320 }
321 }
322 None => {
323 let opts = DialOpts::peer_id(relay_peer_id)
324 .addresses(vec![relay_addr.clone()])
325 .extend_addresses_through_behaviour()
326 .build();
327 let relayed_connection_id = opts.connection_id();
328
329 self.reservation_addresses.insert(
330 relayed_connection_id,
331 (
332 relay_addr
333 .with(Protocol::P2p(relay_peer_id))
334 .with(Protocol::P2pCircuit)
335 .with(Protocol::P2p(self.local_peer_id)),
336 ReservationStatus::Pending,
337 ),
338 );
339
340 self.pending_handler_commands
341 .insert(relayed_connection_id, handler::In::Reserve { to_listener });
342 ToSwarm::Dial { opts }
343 }
344 }
345 }
346 Some(transport::TransportToBehaviourMsg::DialReq {
347 relay_addr,
348 relay_peer_id,
349 dst_peer_id,
350 send_back,
351 ..
352 }) => {
353 match self
354 .directly_connected_peers
355 .get(&relay_peer_id)
356 .and_then(|cs| cs.first())
357 {
358 Some(connection_id) => ToSwarm::NotifyHandler {
359 peer_id: relay_peer_id,
360 handler: NotifyHandler::One(*connection_id),
361 event: Either::Left(handler::In::EstablishCircuit {
362 to_dial: send_back,
363 dst_peer_id,
364 }),
365 },
366 None => {
367 let opts = DialOpts::peer_id(relay_peer_id)
368 .addresses(vec![relay_addr])
369 .extend_addresses_through_behaviour()
370 .build();
371 let connection_id = opts.connection_id();
372
373 self.pending_handler_commands.insert(
374 connection_id,
375 handler::In::EstablishCircuit {
376 to_dial: send_back,
377 dst_peer_id,
378 },
379 );
380
381 ToSwarm::Dial { opts }
382 }
383 }
384 }
385 None => unreachable!(
386 "`relay::Behaviour` polled after channel from \
387 `Transport` has been closed. Unreachable under \
388 the assumption that the `client::Behaviour` is never polled after \
389 `client::Transport` is dropped.",
390 ),
391 };
392
393 Poll::Ready(action)
394 }
395}
396
397pub struct Connection {
401 pub(crate) state: ConnectionState,
402}
403
404pub(crate) enum ConnectionState {
405 InboundAccepting {
406 accept: BoxFuture<'static, Result<ConnectionState, Error>>,
407 },
408 Operational {
409 read_buffer: Bytes,
410 substream: Stream,
411 },
412}
413
414impl Unpin for ConnectionState {}
415
416impl ConnectionState {
417 pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self {
418 ConnectionState::InboundAccepting {
419 accept: async {
420 let (substream, read_buffer) = circuit
421 .accept()
422 .await
423 .map_err(|e| Error::new(ErrorKind::Other, e))?;
424 Ok(ConnectionState::Operational {
425 read_buffer,
426 substream,
427 })
428 }
429 .boxed(),
430 }
431 }
432
433 pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self {
434 ConnectionState::Operational {
435 substream,
436 read_buffer,
437 }
438 }
439}
440
441impl AsyncWrite for Connection {
442 fn poll_write(
443 mut self: Pin<&mut Self>,
444 cx: &mut Context,
445 buf: &[u8],
446 ) -> Poll<Result<usize, Error>> {
447 loop {
448 match &mut self.state {
449 ConnectionState::InboundAccepting { accept } => {
450 *self = Connection {
451 state: ready!(accept.poll_unpin(cx))?,
452 };
453 }
454 ConnectionState::Operational { substream, .. } => {
455 return Pin::new(substream).poll_write(cx, buf);
456 }
457 }
458 }
459 }
460 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
461 loop {
462 match &mut self.state {
463 ConnectionState::InboundAccepting { accept } => {
464 *self = Connection {
465 state: ready!(accept.poll_unpin(cx))?,
466 };
467 }
468 ConnectionState::Operational { substream, .. } => {
469 return Pin::new(substream).poll_flush(cx);
470 }
471 }
472 }
473 }
474 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
475 loop {
476 match &mut self.state {
477 ConnectionState::InboundAccepting { accept } => {
478 *self = Connection {
479 state: ready!(accept.poll_unpin(cx))?,
480 };
481 }
482 ConnectionState::Operational { substream, .. } => {
483 return Pin::new(substream).poll_close(cx);
484 }
485 }
486 }
487 }
488
489 fn poll_write_vectored(
490 mut self: Pin<&mut Self>,
491 cx: &mut Context,
492 bufs: &[IoSlice],
493 ) -> Poll<Result<usize, Error>> {
494 loop {
495 match &mut self.state {
496 ConnectionState::InboundAccepting { accept } => {
497 *self = Connection {
498 state: ready!(accept.poll_unpin(cx))?,
499 };
500 }
501 ConnectionState::Operational { substream, .. } => {
502 return Pin::new(substream).poll_write_vectored(cx, bufs);
503 }
504 }
505 }
506 }
507}
508
509impl AsyncRead for Connection {
510 fn poll_read(
511 mut self: Pin<&mut Self>,
512 cx: &mut Context<'_>,
513 buf: &mut [u8],
514 ) -> Poll<Result<usize, Error>> {
515 loop {
516 match &mut self.state {
517 ConnectionState::InboundAccepting { accept } => {
518 *self = Connection {
519 state: ready!(accept.poll_unpin(cx))?,
520 };
521 }
522 ConnectionState::Operational {
523 read_buffer,
524 substream,
525 ..
526 } => {
527 if !read_buffer.is_empty() {
528 let n = std::cmp::min(read_buffer.len(), buf.len());
529 let data = read_buffer.split_to(n);
530 buf[0..n].copy_from_slice(&data[..]);
531 return Poll::Ready(Ok(n));
532 }
533
534 return Pin::new(substream).poll_read(cx, buf);
535 }
536 }
537 }
538 }
539}