libp2p_identify/
protocol.rs1use std::io;
22
23use asynchronous_codec::{FramedRead, FramedWrite};
24use futures::prelude::*;
25use libp2p_core::{multiaddr, Multiaddr};
26use libp2p_identity as identity;
27use libp2p_identity::PublicKey;
28use libp2p_swarm::StreamProtocol;
29use thiserror::Error;
30
31use crate::proto;
32
33const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34
35pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36
37pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38
39#[derive(Debug, Clone)]
41pub struct Info {
42 pub public_key: PublicKey,
44 pub protocol_version: String,
47 pub agent_version: String,
50 pub listen_addrs: Vec<Multiaddr>,
52 pub protocols: Vec<StreamProtocol>,
54 pub observed_addr: Multiaddr,
56}
57
58impl Info {
59 pub fn merge(&mut self, info: PushInfo) {
60 if let Some(public_key) = info.public_key {
61 self.public_key = public_key;
62 }
63 if let Some(protocol_version) = info.protocol_version {
64 self.protocol_version = protocol_version;
65 }
66 if let Some(agent_version) = info.agent_version {
67 self.agent_version = agent_version;
68 }
69 if !info.listen_addrs.is_empty() {
70 self.listen_addrs = info.listen_addrs;
71 }
72 if !info.protocols.is_empty() {
73 self.protocols = info.protocols;
74 }
75 if let Some(observed_addr) = info.observed_addr {
76 self.observed_addr = observed_addr;
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
85pub struct PushInfo {
86 pub public_key: Option<PublicKey>,
87 pub protocol_version: Option<String>,
88 pub agent_version: Option<String>,
89 pub listen_addrs: Vec<Multiaddr>,
90 pub protocols: Vec<StreamProtocol>,
91 pub observed_addr: Option<Multiaddr>,
92}
93
94pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
95where
96 T: AsyncWrite + Unpin,
97{
98 tracing::trace!("Sending: {:?}", info);
99
100 let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
101
102 let pubkey_bytes = info.public_key.encode_protobuf();
103
104 let message = proto::Identify {
105 agentVersion: Some(info.agent_version.clone()),
106 protocolVersion: Some(info.protocol_version.clone()),
107 publicKey: Some(pubkey_bytes),
108 listenAddrs: listen_addrs,
109 observedAddr: Some(info.observed_addr.to_vec()),
110 protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
111 };
112
113 let mut framed_io = FramedWrite::new(
114 io,
115 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
116 );
117
118 framed_io.send(message).await?;
119 framed_io.close().await?;
120
121 Ok(info)
122}
123
124pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
125where
126 T: AsyncRead + AsyncWrite + Unpin,
127{
128 let info = recv(socket).await?.try_into()?;
129
130 tracing::trace!(?info, "Received");
131
132 Ok(info)
133}
134
135pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
136where
137 T: AsyncRead + AsyncWrite + Unpin,
138{
139 let info = recv(socket).await?.try_into()?;
140
141 tracing::trace!(?info, "Received");
142
143 Ok(info)
144}
145
146async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
147where
148 T: AsyncRead + AsyncWrite + Unpin,
149{
150 let info = FramedRead::new(
156 socket,
157 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
158 )
159 .next()
160 .await
161 .ok_or(UpgradeError::StreamClosed)??;
162
163 Ok(info)
164}
165
166fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
167 listen_addrs
168 .into_iter()
169 .filter_map(|bytes| match Multiaddr::try_from(bytes) {
170 Ok(a) => Some(a),
171 Err(e) => {
172 tracing::debug!("Unable to parse multiaddr: {e:?}");
173 None
174 }
175 })
176 .collect()
177}
178
179fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
180 protocols
181 .into_iter()
182 .filter_map(|p| match StreamProtocol::try_from_owned(p) {
183 Ok(p) => Some(p),
184 Err(e) => {
185 tracing::debug!("Received invalid protocol from peer: {e}");
186 None
187 }
188 })
189 .collect()
190}
191
192fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
193 public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
194 Ok(k) => Some(k),
195 Err(e) => {
196 tracing::debug!("Unable to decode public key: {e:?}");
197 None
198 }
199 })
200}
201
202fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
203 observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
204 Ok(a) => Some(a),
205 Err(e) => {
206 tracing::debug!("Unable to parse observed multiaddr: {e:?}");
207 None
208 }
209 })
210}
211
212impl TryFrom<proto::Identify> for Info {
213 type Error = UpgradeError;
214
215 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
216 let public_key = {
217 match parse_public_key(msg.publicKey) {
218 Some(key) => key,
219 None => PublicKey::try_decode_protobuf(Default::default())?,
221 }
222 };
223
224 let info = Info {
225 public_key,
226 protocol_version: msg.protocolVersion.unwrap_or_default(),
227 agent_version: msg.agentVersion.unwrap_or_default(),
228 listen_addrs: parse_listen_addrs(msg.listenAddrs),
229 protocols: parse_protocols(msg.protocols),
230 observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
231 };
232
233 Ok(info)
234 }
235}
236
237impl TryFrom<proto::Identify> for PushInfo {
238 type Error = UpgradeError;
239
240 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
241 let info = PushInfo {
242 public_key: parse_public_key(msg.publicKey),
243 protocol_version: msg.protocolVersion,
244 agent_version: msg.agentVersion,
245 listen_addrs: parse_listen_addrs(msg.listenAddrs),
246 protocols: parse_protocols(msg.protocols),
247 observed_addr: parse_observed_addr(msg.observedAddr),
248 };
249
250 Ok(info)
251 }
252}
253
254#[derive(Debug, Error)]
255pub enum UpgradeError {
256 #[error(transparent)]
257 Codec(#[from] quick_protobuf_codec::Error),
258 #[error("I/O interaction failed")]
259 Io(#[from] io::Error),
260 #[error("Stream closed")]
261 StreamClosed,
262 #[error("Failed decoding multiaddr")]
263 Multiaddr(#[from] multiaddr::Error),
264 #[error("Failed decoding public key")]
265 PublicKey(#[from] identity::DecodingError),
266}
267
268#[cfg(test)]
269mod tests {
270 use libp2p_identity as identity;
271
272 use super::*;
273
274 #[test]
275 fn skip_invalid_multiaddr() {
276 let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
277 let valid_multiaddr_bytes = valid_multiaddr.to_vec();
278
279 let invalid_multiaddr = {
280 let a = vec![255; 8];
281 assert!(Multiaddr::try_from(a.clone()).is_err());
282 a
283 };
284
285 let payload = proto::Identify {
286 agentVersion: None,
287 listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
288 observedAddr: None,
289 protocolVersion: None,
290 protocols: vec![],
291 publicKey: Some(
292 identity::Keypair::generate_ed25519()
293 .public()
294 .encode_protobuf(),
295 ),
296 };
297
298 let info = PushInfo::try_from(payload).expect("not to fail");
299
300 assert_eq!(info.listen_addrs, vec![valid_multiaddr])
301 }
302}