libp2p_dcutr/protocol/
outbound.rs1use std::io;
22
23use asynchronous_codec::Framed;
24use futures::prelude::*;
25use futures_timer::Delay;
26use libp2p_core::{multiaddr::Protocol, Multiaddr};
27use libp2p_swarm::Stream;
28use thiserror::Error;
29use web_time::Instant;
30
31use crate::{proto, PROTOCOL_NAME};
32
33pub(crate) async fn handshake(
34 stream: Stream,
35 candidates: Vec<Multiaddr>,
36) -> Result<Vec<Multiaddr>, Error> {
37 let mut stream = Framed::new(
38 stream,
39 quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES),
40 );
41
42 let msg = proto::HolePunch {
43 type_pb: proto::Type::CONNECT,
44 ObsAddrs: candidates.into_iter().map(|a| a.to_vec()).collect(),
45 };
46
47 stream.send(msg).await?;
48
49 let sent_time = Instant::now();
50
51 let proto::HolePunch { type_pb, ObsAddrs } = stream
52 .next()
53 .await
54 .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??;
55
56 let rtt = sent_time.elapsed();
57
58 if !matches!(type_pb, proto::Type::CONNECT) {
59 return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync));
60 }
61
62 if ObsAddrs.is_empty() {
63 return Err(Error::Protocol(ProtocolViolation::NoAddresses));
64 }
65
66 let obs_addrs = ObsAddrs
67 .into_iter()
68 .filter_map(|a| match Multiaddr::try_from(a.to_vec()) {
69 Ok(a) => Some(a),
70 Err(e) => {
71 tracing::debug!("Unable to parse multiaddr: {e}");
72 None
73 }
74 })
75 .filter(|a| {
77 if a.iter().any(|p| p == Protocol::P2pCircuit) {
78 tracing::debug!(address=%a, "Dropping relayed address");
79 false
80 } else {
81 true
82 }
83 })
84 .collect();
85
86 let msg = proto::HolePunch {
87 type_pb: proto::Type::SYNC,
88 ObsAddrs: vec![],
89 };
90
91 stream.send(msg).await?;
92
93 Delay::new(rtt / 2).await;
94
95 Ok(obs_addrs)
96}
97
98#[derive(Debug, Error)]
99pub enum Error {
100 #[error("IO error")]
101 Io(#[from] io::Error),
102 #[error("Remote does not support the `{PROTOCOL_NAME}` protocol")]
103 Unsupported,
104 #[error("Protocol error")]
105 Protocol(#[from] ProtocolViolation),
106}
107
108impl From<quick_protobuf_codec::Error> for Error {
109 fn from(e: quick_protobuf_codec::Error) -> Self {
110 Error::Protocol(ProtocolViolation::Codec(e))
111 }
112}
113
114#[derive(Debug, Error)]
115pub enum ProtocolViolation {
116 #[error(transparent)]
117 Codec(#[from] quick_protobuf_codec::Error),
118 #[error("Expected 'status' field to be set.")]
119 MissingStatusField,
120 #[error("Expected 'reservation' field to be set.")]
121 MissingReservationField,
122 #[error("Expected at least one address in reservation.")]
123 NoAddresses,
124 #[error("Invalid expiration timestamp in reservation.")]
125 InvalidReservationExpiration,
126 #[error("Failed to parse response type field.")]
127 ParseTypeField,
128 #[error("Unexpected message type 'connect'")]
129 UnexpectedTypeConnect,
130 #[error("Unexpected message type 'sync'")]
131 UnexpectedTypeSync,
132 #[error("Failed to parse response type field.")]
133 ParseStatusField,
134}