fedimint_server/net/
p2p_connector.rs

1//! Provides an abstract network connection interface and multiple
2//! implementations
3
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::sync::Arc;
9
10use anyhow::{ensure, format_err, Context};
11use async_trait::async_trait;
12use fedimint_core::util::SafeUrl;
13use fedimint_core::PeerId;
14use futures::Stream;
15use rustls::ServerName;
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use tokio::net::{TcpListener, TcpStream};
19use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient;
20use tokio_rustls::rustls::RootCertStore;
21use tokio_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream};
22use tokio_stream::wrappers::TcpListenerStream;
23use tokio_stream::StreamExt;
24use tokio_util::codec::LengthDelimitedCodec;
25
26use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
27
28pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
29
30pub type P2PConnectionResult<M> = anyhow::Result<(PeerId, DynP2PConnection<M>)>;
31
32pub type P2PConnectionListener<M> = Pin<Box<dyn Stream<Item = P2PConnectionResult<M>> + Send>>;
33
34/// Allows to connect to peers and to listen for incoming connections.
35/// Connections are message based and should be authenticated and encrypted for
36/// production deployments.
37#[async_trait]
38pub trait IP2PConnector<M>: Send + Sync + 'static {
39    fn peers(&self) -> Vec<PeerId>;
40
41    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
42
43    async fn listen(&self) -> P2PConnectionListener<M>;
44
45    fn into_dyn(self) -> DynP2PConnector<M>
46    where
47        Self: Sized,
48    {
49        Arc::new(self)
50    }
51}
52
53#[derive(Debug, Clone)]
54pub struct TlsConfig {
55    pub private_key: rustls::PrivateKey,
56    pub certificates: BTreeMap<PeerId, rustls::Certificate>,
57    pub peer_names: BTreeMap<PeerId, String>,
58}
59
60/// TCP connector with encryption and authentication
61#[derive(Debug)]
62pub struct TlsTcpConnector {
63    cfg: TlsConfig,
64    p2p_bind_addr: SocketAddr,
65    peers: BTreeMap<PeerId, SafeUrl>,
66    identity: PeerId,
67}
68
69impl TlsTcpConnector {
70    pub fn new(
71        cfg: TlsConfig,
72        p2p_bind_addr: SocketAddr,
73        peers: BTreeMap<PeerId, SafeUrl>,
74        identity: PeerId,
75    ) -> TlsTcpConnector {
76        TlsTcpConnector {
77            cfg,
78            p2p_bind_addr,
79            peers,
80            identity,
81        }
82    }
83}
84
85#[async_trait]
86impl<M> IP2PConnector<M> for TlsTcpConnector
87where
88    M: Serialize + DeserializeOwned + Send + 'static,
89{
90    fn peers(&self) -> Vec<PeerId> {
91        self.peers
92            .keys()
93            .filter(|peer| **peer != self.identity)
94            .copied()
95            .collect()
96    }
97
98    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
99        let mut root_cert_store = RootCertStore::empty();
100
101        for cert in self.cfg.certificates.values() {
102            root_cert_store
103                .add(cert)
104                .expect("Could not add peer certificate");
105        }
106
107        let certificate = self
108            .cfg
109            .certificates
110            .get(&self.identity)
111            .expect("No certificate for ourself found")
112            .clone();
113
114        let cfg = rustls::ClientConfig::builder()
115            .with_safe_defaults()
116            .with_root_certificates(root_cert_store)
117            .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone())
118            .expect("Failed to create TLS config");
119
120        let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]).as_str())
121            .expect("Always a valid DNS name");
122
123        let destination = self
124            .peers
125            .get(&peer)
126            .expect("No url for peer {peer}")
127            .with_port_or_known_default();
128
129        let tls = TlsConnector::from(Arc::new(cfg))
130            .connect(
131                domain,
132                TcpStream::connect(parse_host_port(&destination)?).await?,
133            )
134            .await?;
135
136        let certificate = tls
137            .get_ref()
138            .1
139            .peer_certificates()
140            .context("Peer did not authenticate itself")?
141            .first()
142            .context("Received certificate chain of length zero")?;
143
144        let auth_peer = self
145            .cfg
146            .certificates
147            .iter()
148            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
149            .context("Unknown certificate")?;
150
151        ensure!(auth_peer == peer, "Connected to unexpected peer");
152
153        Ok(LengthDelimitedCodec::builder()
154            .length_field_type::<u64>()
155            .new_framed(TlsStream::Client(tls))
156            .into_dyn())
157    }
158
159    async fn listen(&self) -> P2PConnectionListener<M> {
160        let mut root_cert_store = RootCertStore::empty();
161
162        for cert in self.cfg.certificates.values() {
163            root_cert_store
164                .add(cert)
165                .expect("Could not add peer certificate");
166        }
167
168        let verifier = AllowAnyAuthenticatedClient::new(root_cert_store);
169
170        let certificate = self
171            .cfg
172            .certificates
173            .get(&self.identity)
174            .expect("No certificate for ourself found")
175            .clone();
176
177        let config = rustls::ServerConfig::builder()
178            .with_safe_defaults()
179            .with_client_cert_verifier(Arc::from(verifier))
180            .with_single_cert(vec![certificate], self.cfg.private_key.clone())
181            .expect("Failed to create TLS config");
182
183        let listener = TcpListener::bind(self.p2p_bind_addr)
184            .await
185            .expect("Could not bind to port");
186
187        let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
188
189        let cfg = self.cfg.clone();
190
191        let stream = TcpListenerStream::new(listener).then(move |connection| {
192            Box::pin({
193                let cfg = cfg.clone();
194                let acceptor = acceptor.clone();
195
196                async move {
197                    let tls = acceptor.accept(connection?).await?;
198
199                    let certificate = tls
200                        .get_ref()
201                        .1
202                        .peer_certificates()
203                        .context("Peer did not authenticate itself")?
204                        .first()
205                        .context("Received certificate chain of length zero")?;
206
207                    let auth_peer = cfg
208                        .certificates
209                        .iter()
210                        .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
211                        .context("Unknown certificate")?;
212
213                    let framed = LengthDelimitedCodec::builder()
214                        .length_field_type::<u64>()
215                        .new_framed(TlsStream::Server(tls))
216                        .into_dyn();
217
218                    Ok((auth_peer, framed))
219                }
220            })
221        });
222
223        Box::pin(stream)
224    }
225}
226
227/// Sanitizes name as valid domain name
228pub fn dns_sanitize(name: &str) -> String {
229    let sanitized = name.replace(|c: char| !c.is_ascii_alphanumeric(), "_");
230    format!("peer{sanitized}")
231}
232
233/// Parses the host and port from a url
234pub fn parse_host_port(url: &SafeUrl) -> anyhow::Result<String> {
235    let host = url
236        .host_str()
237        .ok_or_else(|| format_err!("Missing host in {url}"))?;
238    let port = url
239        .port_or_known_default()
240        .ok_or_else(|| format_err!("Missing port in {url}"))?;
241
242    Ok(format!("{host}:{port}"))
243}
244
245#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
246pub mod iroh {
247    use std::collections::BTreeMap;
248
249    use anyhow::Context;
250    use async_trait::async_trait;
251    use fedimint_core::encoding::{Decodable, Encodable};
252    use fedimint_core::PeerId;
253    use iroh::endpoint::Incoming;
254    use iroh::{Endpoint, NodeId, SecretKey};
255
256    use crate::net::p2p_connection::IP2PConnection;
257    use crate::net::p2p_connector::{
258        DynP2PConnection, IP2PConnector, P2PConnectionListener, P2PConnectionResult,
259    };
260
261    #[derive(Debug, Clone)]
262    pub struct IrohConnector {
263        /// Map of all peers' connection information we want to be connected to
264        pub node_ids: BTreeMap<PeerId, NodeId>,
265        /// The Iroh endpoint
266        pub endpoint: Endpoint,
267    }
268
269    const FEDIMINT_ALPN: &[u8] = "FEDIMINT_ALPN".as_bytes();
270
271    impl IrohConnector {
272        pub async fn new(
273            secret_key: SecretKey,
274            node_ids: BTreeMap<PeerId, NodeId>,
275        ) -> anyhow::Result<Self> {
276            let identity = *node_ids
277                .iter()
278                .find(|entry| entry.1 == &secret_key.public())
279                .expect("Our public key is not part of the keyset")
280                .0;
281
282            Ok(Self {
283                node_ids: node_ids
284                    .into_iter()
285                    .filter(|entry| entry.0 != identity)
286                    .collect(),
287                endpoint: Endpoint::builder()
288                    .discovery_n0()
289                    .secret_key(secret_key)
290                    .alpns(vec![FEDIMINT_ALPN.to_vec()])
291                    .bind()
292                    .await?,
293            })
294        }
295    }
296
297    #[async_trait]
298    impl<M> IP2PConnector<M> for IrohConnector
299    where
300        M: Encodable + Decodable + Send + 'static,
301    {
302        fn peers(&self) -> Vec<PeerId> {
303            self.node_ids.keys().copied().collect()
304        }
305
306        async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
307            let node_id = *self
308                .node_ids
309                .get(&peer)
310                .expect("No node id found for peer {peer}");
311
312            let connection = self.endpoint.connect(node_id, FEDIMINT_ALPN).await?;
313
314            Ok(connection.into_dyn())
315        }
316
317        async fn listen(&self) -> P2PConnectionListener<M> {
318            let stream = futures::stream::unfold(self.clone(), move |endpoint| async move {
319                let stream = endpoint.endpoint.accept().await?;
320
321                let result = accept_connection(&endpoint.node_ids, stream).await;
322
323                Some((result, endpoint))
324            });
325
326            Box::pin(stream)
327        }
328    }
329
330    async fn accept_connection<M>(
331        peers: &BTreeMap<PeerId, NodeId>,
332        incoming: Incoming,
333    ) -> P2PConnectionResult<M>
334    where
335        M: Encodable + Decodable + Send + 'static,
336    {
337        let connection = incoming.accept()?.await?;
338
339        let node_id = iroh::endpoint::get_remote_node_id(&connection)?;
340
341        let peer_id = peers
342            .iter()
343            .find(|entry| entry.1 == &node_id)
344            .context("Node id {node_id} is unknown")?
345            .0;
346
347        Ok((*peer_id, connection.into_dyn()))
348    }
349}