fedimint_server/net/
p2p_connection.rs

1use std::fmt::Debug;
2use std::io::Cursor;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use bytes::Bytes;
7use futures::{SinkExt, StreamExt};
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use tokio::net::TcpStream;
11use tokio_rustls::TlsStream;
12use tokio_util::codec::{Framed, LengthDelimitedCodec};
13
14pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;
15
16#[async_trait]
17pub trait IP2PConnection<M>: Send + 'static {
18    async fn send(&mut self, message: M) -> anyhow::Result<()>;
19
20    async fn receive(&mut self) -> anyhow::Result<M>;
21
22    fn into_dyn(self) -> DynP2PConnection<M>
23    where
24        Self: Sized,
25    {
26        Box::new(self)
27    }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum LegacyMessage<M> {
32    Message(M),
33    Ping,
34}
35
36#[async_trait]
37impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
38where
39    M: Serialize + DeserializeOwned + Send + 'static,
40{
41    async fn send(&mut self, message: M) -> anyhow::Result<()> {
42        let mut bytes = Vec::new();
43
44        bincode::serialize_into(&mut bytes, &LegacyMessage::Message(message))?;
45
46        SinkExt::send(self, Bytes::from_owner(bytes)).await?;
47
48        Ok(())
49    }
50
51    async fn receive(&mut self) -> anyhow::Result<M> {
52        loop {
53            let bytes = self.next().await.context("Framed stream is closed")??;
54
55            if let Ok(legacy_message) = bincode::deserialize_from(Cursor::new(&bytes)) {
56                match legacy_message {
57                    LegacyMessage::Message(message) => return Ok(message),
58                    LegacyMessage::Ping => continue,
59                }
60            }
61
62            return Ok(bincode::deserialize_from(Cursor::new(&bytes))?);
63        }
64    }
65}
66
67#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
68pub mod iroh {
69    use async_trait::async_trait;
70    use fedimint_core::encoding::{Decodable, Encodable};
71    use fedimint_core::module::registry::ModuleDecoderRegistry;
72    use iroh::endpoint::Connection;
73
74    use crate::net::p2p_connection::IP2PConnection;
75
76    #[async_trait]
77    impl<M> IP2PConnection<M> for Connection
78    where
79        M: Encodable + Decodable + Send + 'static,
80    {
81        async fn send(&mut self, message: M) -> anyhow::Result<()> {
82            let mut sink = self.open_uni().await?;
83
84            sink.write_all(&message.consensus_encode_to_vec()).await?;
85
86            sink.finish()?;
87
88            Ok(())
89        }
90
91        async fn receive(&mut self) -> anyhow::Result<M> {
92            let bytes = self.accept_uni().await?.read_to_end(1_000_000_000).await?;
93
94            Ok(Decodable::consensus_decode_whole(
95                &bytes,
96                &ModuleDecoderRegistry::default(),
97            )?)
98        }
99    }
100}