libp2p_dcutr/protocol/
inbound.rs1use std::io;
22
23use asynchronous_codec::Framed;
24use futures::prelude::*;
25use libp2p_core::{multiaddr::Protocol, Multiaddr};
26use libp2p_swarm::Stream;
27use thiserror::Error;
28
29use crate::proto;
30
31pub(crate) async fn handshake(
32 stream: Stream,
33 candidates: Vec<Multiaddr>,
34) -> Result<Vec<Multiaddr>, Error> {
35 let mut stream = Framed::new(
36 stream,
37 quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES),
38 );
39
40 let proto::HolePunch { type_pb, ObsAddrs } = stream
41 .next()
42 .await
43 .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??;
44
45 if ObsAddrs.is_empty() {
46 return Err(Error::Protocol(ProtocolViolation::NoAddresses));
47 };
48
49 let obs_addrs = ObsAddrs
50 .into_iter()
51 .filter_map(|a| match Multiaddr::try_from(a.to_vec()) {
52 Ok(a) => Some(a),
53 Err(e) => {
54 tracing::debug!("Unable to parse multiaddr: {e}");
55 None
56 }
57 })
58 .filter(|a| {
60 if a.iter().any(|p| p == Protocol::P2pCircuit) {
61 tracing::debug!(address=%a, "Dropping relayed address");
62 false
63 } else {
64 true
65 }
66 })
67 .collect();
68
69 if !matches!(type_pb, proto::Type::CONNECT) {
70 return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync));
71 }
72
73 let msg = proto::HolePunch {
74 type_pb: proto::Type::CONNECT,
75 ObsAddrs: candidates.into_iter().map(|a| a.to_vec()).collect(),
76 };
77
78 stream.send(msg).await?;
79 let proto::HolePunch { type_pb, .. } = stream
80 .next()
81 .await
82 .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??;
83
84 if !matches!(type_pb, proto::Type::SYNC) {
85 return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect));
86 }
87
88 Ok(obs_addrs)
89}
90
91#[derive(Debug, Error)]
92pub enum Error {
93 #[error("IO error")]
94 Io(#[from] io::Error),
95 #[error("Protocol error")]
96 Protocol(#[from] ProtocolViolation),
97}
98
99impl From<quick_protobuf_codec::Error> for Error {
100 fn from(e: quick_protobuf_codec::Error) -> Self {
101 Error::Protocol(ProtocolViolation::Codec(e))
102 }
103}
104
105#[derive(Debug, Error)]
106pub enum ProtocolViolation {
107 #[error(transparent)]
108 Codec(#[from] quick_protobuf_codec::Error),
109 #[error("Expected at least one address in reservation.")]
110 NoAddresses,
111 #[error("Failed to parse response type field.")]
112 ParseTypeField,
113 #[error("Unexpected message type 'connect'")]
114 UnexpectedTypeConnect,
115 #[error("Unexpected message type 'sync'")]
116 UnexpectedTypeSync,
117}