libp2p_identify/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::io;
22
23use asynchronous_codec::{FramedRead, FramedWrite};
24use futures::prelude::*;
25use libp2p_core::{multiaddr, Multiaddr};
26use libp2p_identity as identity;
27use libp2p_identity::PublicKey;
28use libp2p_swarm::StreamProtocol;
29use thiserror::Error;
30
31use crate::proto;
32
33const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34
35pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36
37pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38
39/// Identify information of a peer sent in protocol messages.
40#[derive(Debug, Clone)]
41pub struct Info {
42    /// The public key of the peer.
43    pub public_key: PublicKey,
44    /// Application-specific version of the protocol family used by the peer,
45    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
46    pub protocol_version: String,
47    /// Name and version of the peer, similar to the `User-Agent` header in
48    /// the HTTP protocol.
49    pub agent_version: String,
50    /// The addresses that the peer is listening on.
51    pub listen_addrs: Vec<Multiaddr>,
52    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
53    pub protocols: Vec<StreamProtocol>,
54    /// Address observed by or for the remote.
55    pub observed_addr: Multiaddr,
56}
57
58impl Info {
59    pub fn merge(&mut self, info: PushInfo) {
60        if let Some(public_key) = info.public_key {
61            self.public_key = public_key;
62        }
63        if let Some(protocol_version) = info.protocol_version {
64            self.protocol_version = protocol_version;
65        }
66        if let Some(agent_version) = info.agent_version {
67            self.agent_version = agent_version;
68        }
69        if !info.listen_addrs.is_empty() {
70            self.listen_addrs = info.listen_addrs;
71        }
72        if !info.protocols.is_empty() {
73            self.protocols = info.protocols;
74        }
75        if let Some(observed_addr) = info.observed_addr {
76            self.observed_addr = observed_addr;
77        }
78    }
79}
80
81/// Identify push information of a peer sent in protocol messages.
82/// Note that missing fields should be ignored, as peers may choose to send partial updates
83/// containing only the fields whose values have changed.
84#[derive(Debug, Clone)]
85pub struct PushInfo {
86    pub public_key: Option<PublicKey>,
87    pub protocol_version: Option<String>,
88    pub agent_version: Option<String>,
89    pub listen_addrs: Vec<Multiaddr>,
90    pub protocols: Vec<StreamProtocol>,
91    pub observed_addr: Option<Multiaddr>,
92}
93
94pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
95where
96    T: AsyncWrite + Unpin,
97{
98    tracing::trace!("Sending: {:?}", info);
99
100    let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
101
102    let pubkey_bytes = info.public_key.encode_protobuf();
103
104    let message = proto::Identify {
105        agentVersion: Some(info.agent_version.clone()),
106        protocolVersion: Some(info.protocol_version.clone()),
107        publicKey: Some(pubkey_bytes),
108        listenAddrs: listen_addrs,
109        observedAddr: Some(info.observed_addr.to_vec()),
110        protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
111    };
112
113    let mut framed_io = FramedWrite::new(
114        io,
115        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
116    );
117
118    framed_io.send(message).await?;
119    framed_io.close().await?;
120
121    Ok(info)
122}
123
124pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
125where
126    T: AsyncRead + AsyncWrite + Unpin,
127{
128    let info = recv(socket).await?.try_into()?;
129
130    tracing::trace!(?info, "Received");
131
132    Ok(info)
133}
134
135pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
136where
137    T: AsyncRead + AsyncWrite + Unpin,
138{
139    let info = recv(socket).await?.try_into()?;
140
141    tracing::trace!(?info, "Received");
142
143    Ok(info)
144}
145
146async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
147where
148    T: AsyncRead + AsyncWrite + Unpin,
149{
150    // Even though we won't write to the stream anymore we don't close it here.
151    // The reason for this is that the `close` call on some transport's require the
152    // remote's ACK, but it could be that the remote already dropped the stream
153    // after finishing their write.
154
155    let info = FramedRead::new(
156        socket,
157        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
158    )
159    .next()
160    .await
161    .ok_or(UpgradeError::StreamClosed)??;
162
163    Ok(info)
164}
165
166fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
167    listen_addrs
168        .into_iter()
169        .filter_map(|bytes| match Multiaddr::try_from(bytes) {
170            Ok(a) => Some(a),
171            Err(e) => {
172                tracing::debug!("Unable to parse multiaddr: {e:?}");
173                None
174            }
175        })
176        .collect()
177}
178
179fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
180    protocols
181        .into_iter()
182        .filter_map(|p| match StreamProtocol::try_from_owned(p) {
183            Ok(p) => Some(p),
184            Err(e) => {
185                tracing::debug!("Received invalid protocol from peer: {e}");
186                None
187            }
188        })
189        .collect()
190}
191
192fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
193    public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
194        Ok(k) => Some(k),
195        Err(e) => {
196            tracing::debug!("Unable to decode public key: {e:?}");
197            None
198        }
199    })
200}
201
202fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
203    observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
204        Ok(a) => Some(a),
205        Err(e) => {
206            tracing::debug!("Unable to parse observed multiaddr: {e:?}");
207            None
208        }
209    })
210}
211
212impl TryFrom<proto::Identify> for Info {
213    type Error = UpgradeError;
214
215    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
216        let public_key = {
217            match parse_public_key(msg.publicKey) {
218                Some(key) => key,
219                // This will always produce a DecodingError if the public key is missing.
220                None => PublicKey::try_decode_protobuf(Default::default())?,
221            }
222        };
223
224        let info = Info {
225            public_key,
226            protocol_version: msg.protocolVersion.unwrap_or_default(),
227            agent_version: msg.agentVersion.unwrap_or_default(),
228            listen_addrs: parse_listen_addrs(msg.listenAddrs),
229            protocols: parse_protocols(msg.protocols),
230            observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
231        };
232
233        Ok(info)
234    }
235}
236
237impl TryFrom<proto::Identify> for PushInfo {
238    type Error = UpgradeError;
239
240    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
241        let info = PushInfo {
242            public_key: parse_public_key(msg.publicKey),
243            protocol_version: msg.protocolVersion,
244            agent_version: msg.agentVersion,
245            listen_addrs: parse_listen_addrs(msg.listenAddrs),
246            protocols: parse_protocols(msg.protocols),
247            observed_addr: parse_observed_addr(msg.observedAddr),
248        };
249
250        Ok(info)
251    }
252}
253
254#[derive(Debug, Error)]
255pub enum UpgradeError {
256    #[error(transparent)]
257    Codec(#[from] quick_protobuf_codec::Error),
258    #[error("I/O interaction failed")]
259    Io(#[from] io::Error),
260    #[error("Stream closed")]
261    StreamClosed,
262    #[error("Failed decoding multiaddr")]
263    Multiaddr(#[from] multiaddr::Error),
264    #[error("Failed decoding public key")]
265    PublicKey(#[from] identity::DecodingError),
266}
267
268#[cfg(test)]
269mod tests {
270    use libp2p_identity as identity;
271
272    use super::*;
273
274    #[test]
275    fn skip_invalid_multiaddr() {
276        let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
277        let valid_multiaddr_bytes = valid_multiaddr.to_vec();
278
279        let invalid_multiaddr = {
280            let a = vec![255; 8];
281            assert!(Multiaddr::try_from(a.clone()).is_err());
282            a
283        };
284
285        let payload = proto::Identify {
286            agentVersion: None,
287            listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
288            observedAddr: None,
289            protocolVersion: None,
290            protocols: vec![],
291            publicKey: Some(
292                identity::Keypair::generate_ed25519()
293                    .public()
294                    .encode_protobuf(),
295            ),
296        };
297
298        let info = PushInfo::try_from(payload).expect("not to fail");
299
300        assert_eq!(info.listen_addrs, vec![valid_multiaddr])
301    }
302}