fedimint_server/consensus/aleph_bft/
network.rs

1use bitcoin::hashes::{sha256, Hash};
2use fedimint_core::encoding::Encodable;
3use fedimint_core::net::peers::IPeerConnections;
4use parity_scale_codec::{Decode, Encode, IoReader};
5
6use super::data_provider::UnitData;
7use super::keychain::Keychain;
8use super::{Message, Recipient};
9use crate::net::peers::ReconnectPeerConnections;
10
11#[derive(Debug, Clone, Eq, PartialEq)]
12pub struct Hasher;
13
14impl aleph_bft::Hasher for Hasher {
15    type Hash = [u8; 32];
16
17    fn hash(input: &[u8]) -> Self::Hash {
18        input.consensus_hash::<sha256::Hash>().to_byte_array()
19    }
20}
21
22pub type NetworkData = aleph_bft::NetworkData<
23    Hasher,
24    UnitData,
25    <Keychain as aleph_bft::Keychain>::Signature,
26    <Keychain as aleph_bft::MultiKeychain>::PartialMultisignature,
27>;
28
29pub struct Network {
30    connections: ReconnectPeerConnections<Message>,
31}
32
33impl Network {
34    pub fn new(connections: ReconnectPeerConnections<Message>) -> Self {
35        Self { connections }
36    }
37}
38
39#[async_trait::async_trait]
40impl aleph_bft::Network<NetworkData> for Network {
41    fn send(&self, network_data: NetworkData, recipient: aleph_bft::Recipient) {
42        // convert from aleph_bft::Recipient to session::Recipient
43        let recipient = match recipient {
44            aleph_bft::Recipient::Node(node_index) => {
45                Recipient::Peer(super::to_peer_id(node_index))
46            }
47            aleph_bft::Recipient::Everyone => Recipient::Everyone,
48        };
49
50        // since NetworkData does not implement Encodable we use
51        // parity_scale_codec::Encode to serialize it such that Message can
52        // implement Encodable
53        self.connections
54            .send_sync(&Message(network_data.encode()), recipient);
55    }
56
57    async fn next_event(&mut self) -> Option<NetworkData> {
58        while let Ok(message) = self.connections.receive().await {
59            if let Ok(network_data) = NetworkData::decode(&mut IoReader(message.1 .0.as_slice())) {
60                // in order to bound the RAM consumption of a session we have to bound an
61                // individual units size, hence the size of its attached unitdata in memory
62                if network_data.included_data().iter().all(UnitData::is_valid) {
63                    return Some(network_data);
64                }
65            }
66        }
67        // this prevents the aleph session from shutting down when the
68        // network data sender is dropped by the message relay task
69        std::future::pending().await
70    }
71}