fedimint_server/net/
p2p_connector.rs1use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::sync::Arc;
9
10use anyhow::{ensure, format_err, Context};
11use async_trait::async_trait;
12use fedimint_core::util::SafeUrl;
13use fedimint_core::PeerId;
14use futures::Stream;
15use rustls::ServerName;
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use tokio::net::{TcpListener, TcpStream};
19use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient;
20use tokio_rustls::rustls::RootCertStore;
21use tokio_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream};
22use tokio_stream::wrappers::TcpListenerStream;
23use tokio_stream::StreamExt;
24use tokio_util::codec::LengthDelimitedCodec;
25
26use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
27
28pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
29
30pub type P2PConnectionResult<M> = anyhow::Result<(PeerId, DynP2PConnection<M>)>;
31
32pub type P2PConnectionListener<M> = Pin<Box<dyn Stream<Item = P2PConnectionResult<M>> + Send>>;
33
34#[async_trait]
38pub trait IP2PConnector<M>: Send + Sync + 'static {
39 fn peers(&self) -> Vec<PeerId>;
40
41 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
42
43 async fn listen(&self) -> P2PConnectionListener<M>;
44
45 fn into_dyn(self) -> DynP2PConnector<M>
46 where
47 Self: Sized,
48 {
49 Arc::new(self)
50 }
51}
52
53#[derive(Debug, Clone)]
54pub struct TlsConfig {
55 pub private_key: rustls::PrivateKey,
56 pub certificates: BTreeMap<PeerId, rustls::Certificate>,
57 pub peer_names: BTreeMap<PeerId, String>,
58}
59
60#[derive(Debug)]
62pub struct TlsTcpConnector {
63 cfg: TlsConfig,
64 p2p_bind_addr: SocketAddr,
65 peers: BTreeMap<PeerId, SafeUrl>,
66 identity: PeerId,
67}
68
69impl TlsTcpConnector {
70 pub fn new(
71 cfg: TlsConfig,
72 p2p_bind_addr: SocketAddr,
73 peers: BTreeMap<PeerId, SafeUrl>,
74 identity: PeerId,
75 ) -> TlsTcpConnector {
76 TlsTcpConnector {
77 cfg,
78 p2p_bind_addr,
79 peers,
80 identity,
81 }
82 }
83}
84
85#[async_trait]
86impl<M> IP2PConnector<M> for TlsTcpConnector
87where
88 M: Serialize + DeserializeOwned + Send + 'static,
89{
90 fn peers(&self) -> Vec<PeerId> {
91 self.peers
92 .keys()
93 .filter(|peer| **peer != self.identity)
94 .copied()
95 .collect()
96 }
97
98 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
99 let mut root_cert_store = RootCertStore::empty();
100
101 for cert in self.cfg.certificates.values() {
102 root_cert_store
103 .add(cert)
104 .expect("Could not add peer certificate");
105 }
106
107 let certificate = self
108 .cfg
109 .certificates
110 .get(&self.identity)
111 .expect("No certificate for ourself found")
112 .clone();
113
114 let cfg = rustls::ClientConfig::builder()
115 .with_safe_defaults()
116 .with_root_certificates(root_cert_store)
117 .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone())
118 .expect("Failed to create TLS config");
119
120 let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]).as_str())
121 .expect("Always a valid DNS name");
122
123 let destination = self
124 .peers
125 .get(&peer)
126 .expect("No url for peer {peer}")
127 .with_port_or_known_default();
128
129 let tls = TlsConnector::from(Arc::new(cfg))
130 .connect(
131 domain,
132 TcpStream::connect(parse_host_port(&destination)?).await?,
133 )
134 .await?;
135
136 let certificate = tls
137 .get_ref()
138 .1
139 .peer_certificates()
140 .context("Peer did not authenticate itself")?
141 .first()
142 .context("Received certificate chain of length zero")?;
143
144 let auth_peer = self
145 .cfg
146 .certificates
147 .iter()
148 .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
149 .context("Unknown certificate")?;
150
151 ensure!(auth_peer == peer, "Connected to unexpected peer");
152
153 Ok(LengthDelimitedCodec::builder()
154 .length_field_type::<u64>()
155 .new_framed(TlsStream::Client(tls))
156 .into_dyn())
157 }
158
159 async fn listen(&self) -> P2PConnectionListener<M> {
160 let mut root_cert_store = RootCertStore::empty();
161
162 for cert in self.cfg.certificates.values() {
163 root_cert_store
164 .add(cert)
165 .expect("Could not add peer certificate");
166 }
167
168 let verifier = AllowAnyAuthenticatedClient::new(root_cert_store);
169
170 let certificate = self
171 .cfg
172 .certificates
173 .get(&self.identity)
174 .expect("No certificate for ourself found")
175 .clone();
176
177 let config = rustls::ServerConfig::builder()
178 .with_safe_defaults()
179 .with_client_cert_verifier(Arc::from(verifier))
180 .with_single_cert(vec![certificate], self.cfg.private_key.clone())
181 .expect("Failed to create TLS config");
182
183 let listener = TcpListener::bind(self.p2p_bind_addr)
184 .await
185 .expect("Could not bind to port");
186
187 let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
188
189 let cfg = self.cfg.clone();
190
191 let stream = TcpListenerStream::new(listener).then(move |connection| {
192 Box::pin({
193 let cfg = cfg.clone();
194 let acceptor = acceptor.clone();
195
196 async move {
197 let tls = acceptor.accept(connection?).await?;
198
199 let certificate = tls
200 .get_ref()
201 .1
202 .peer_certificates()
203 .context("Peer did not authenticate itself")?
204 .first()
205 .context("Received certificate chain of length zero")?;
206
207 let auth_peer = cfg
208 .certificates
209 .iter()
210 .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
211 .context("Unknown certificate")?;
212
213 let framed = LengthDelimitedCodec::builder()
214 .length_field_type::<u64>()
215 .new_framed(TlsStream::Server(tls))
216 .into_dyn();
217
218 Ok((auth_peer, framed))
219 }
220 })
221 });
222
223 Box::pin(stream)
224 }
225}
226
227pub fn dns_sanitize(name: &str) -> String {
229 let sanitized = name.replace(|c: char| !c.is_ascii_alphanumeric(), "_");
230 format!("peer{sanitized}")
231}
232
233pub fn parse_host_port(url: &SafeUrl) -> anyhow::Result<String> {
235 let host = url
236 .host_str()
237 .ok_or_else(|| format_err!("Missing host in {url}"))?;
238 let port = url
239 .port_or_known_default()
240 .ok_or_else(|| format_err!("Missing port in {url}"))?;
241
242 Ok(format!("{host}:{port}"))
243}
244
245#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
246pub mod iroh {
247 use std::collections::BTreeMap;
248
249 use anyhow::Context;
250 use async_trait::async_trait;
251 use fedimint_core::encoding::{Decodable, Encodable};
252 use fedimint_core::PeerId;
253 use iroh::endpoint::Incoming;
254 use iroh::{Endpoint, NodeId, SecretKey};
255
256 use crate::net::p2p_connection::IP2PConnection;
257 use crate::net::p2p_connector::{
258 DynP2PConnection, IP2PConnector, P2PConnectionListener, P2PConnectionResult,
259 };
260
261 #[derive(Debug, Clone)]
262 pub struct IrohConnector {
263 pub node_ids: BTreeMap<PeerId, NodeId>,
265 pub endpoint: Endpoint,
267 }
268
269 const FEDIMINT_ALPN: &[u8] = "FEDIMINT_ALPN".as_bytes();
270
271 impl IrohConnector {
272 pub async fn new(
273 secret_key: SecretKey,
274 node_ids: BTreeMap<PeerId, NodeId>,
275 ) -> anyhow::Result<Self> {
276 let identity = *node_ids
277 .iter()
278 .find(|entry| entry.1 == &secret_key.public())
279 .expect("Our public key is not part of the keyset")
280 .0;
281
282 Ok(Self {
283 node_ids: node_ids
284 .into_iter()
285 .filter(|entry| entry.0 != identity)
286 .collect(),
287 endpoint: Endpoint::builder()
288 .discovery_n0()
289 .secret_key(secret_key)
290 .alpns(vec![FEDIMINT_ALPN.to_vec()])
291 .bind()
292 .await?,
293 })
294 }
295 }
296
297 #[async_trait]
298 impl<M> IP2PConnector<M> for IrohConnector
299 where
300 M: Encodable + Decodable + Send + 'static,
301 {
302 fn peers(&self) -> Vec<PeerId> {
303 self.node_ids.keys().copied().collect()
304 }
305
306 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
307 let node_id = *self
308 .node_ids
309 .get(&peer)
310 .expect("No node id found for peer {peer}");
311
312 let connection = self.endpoint.connect(node_id, FEDIMINT_ALPN).await?;
313
314 Ok(connection.into_dyn())
315 }
316
317 async fn listen(&self) -> P2PConnectionListener<M> {
318 let stream = futures::stream::unfold(self.clone(), move |endpoint| async move {
319 let stream = endpoint.endpoint.accept().await?;
320
321 let result = accept_connection(&endpoint.node_ids, stream).await;
322
323 Some((result, endpoint))
324 });
325
326 Box::pin(stream)
327 }
328 }
329
330 async fn accept_connection<M>(
331 peers: &BTreeMap<PeerId, NodeId>,
332 incoming: Incoming,
333 ) -> P2PConnectionResult<M>
334 where
335 M: Encodable + Decodable + Send + 'static,
336 {
337 let connection = incoming.accept()?.await?;
338
339 let node_id = iroh::endpoint::get_remote_node_id(&connection)?;
340
341 let peer_id = peers
342 .iter()
343 .find(|entry| entry.1 == &node_id)
344 .context("Node id {node_id} is unknown")?
345 .0;
346
347 Ok((*peer_id, connection.into_dyn()))
348 }
349}