fedimint_server/net/
p2p_connection.rs1use std::fmt::Debug;
2use std::io::Cursor;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use bytes::Bytes;
7use futures::{SinkExt, StreamExt};
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use tokio::net::TcpStream;
11use tokio_rustls::TlsStream;
12use tokio_util::codec::{Framed, LengthDelimitedCodec};
13
14pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;
15
16#[async_trait]
17pub trait IP2PConnection<M>: Send + 'static {
18 async fn send(&mut self, message: M) -> anyhow::Result<()>;
19
20 async fn receive(&mut self) -> anyhow::Result<M>;
21
22 fn into_dyn(self) -> DynP2PConnection<M>
23 where
24 Self: Sized,
25 {
26 Box::new(self)
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum LegacyMessage<M> {
32 Message(M),
33 Ping,
34}
35
36#[async_trait]
37impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
38where
39 M: Serialize + DeserializeOwned + Send + 'static,
40{
41 async fn send(&mut self, message: M) -> anyhow::Result<()> {
42 let mut bytes = Vec::new();
43
44 bincode::serialize_into(&mut bytes, &LegacyMessage::Message(message))?;
45
46 SinkExt::send(self, Bytes::from_owner(bytes)).await?;
47
48 Ok(())
49 }
50
51 async fn receive(&mut self) -> anyhow::Result<M> {
52 loop {
53 let bytes = self.next().await.context("Framed stream is closed")??;
54
55 if let Ok(legacy_message) = bincode::deserialize_from(Cursor::new(&bytes)) {
56 match legacy_message {
57 LegacyMessage::Message(message) => return Ok(message),
58 LegacyMessage::Ping => continue,
59 }
60 }
61
62 return Ok(bincode::deserialize_from(Cursor::new(&bytes))?);
63 }
64 }
65}
66
67#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
68pub mod iroh {
69 use async_trait::async_trait;
70 use fedimint_core::encoding::{Decodable, Encodable};
71 use fedimint_core::module::registry::ModuleDecoderRegistry;
72 use iroh::endpoint::Connection;
73
74 use crate::net::p2p_connection::IP2PConnection;
75
76 #[async_trait]
77 impl<M> IP2PConnection<M> for Connection
78 where
79 M: Encodable + Decodable + Send + 'static,
80 {
81 async fn send(&mut self, message: M) -> anyhow::Result<()> {
82 let mut sink = self.open_uni().await?;
83
84 sink.write_all(&message.consensus_encode_to_vec()).await?;
85
86 sink.finish()?;
87
88 Ok(())
89 }
90
91 async fn receive(&mut self) -> anyhow::Result<M> {
92 let bytes = self.accept_uni().await?.read_to_end(1_000_000_000).await?;
93
94 Ok(Decodable::consensus_decode_whole(
95 &bytes,
96 &ModuleDecoderRegistry::default(),
97 )?)
98 }
99 }
100}