fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::env;
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::get_global_database_migrations;
17use fedimint_api_client::api::net::Connector;
18use fedimint_api_client::api::{DynGlobalApi, P2PConnectionStatus};
19use fedimint_core::core::{ModuleInstanceId, ModuleKind};
20use fedimint_core::db::{apply_migrations, apply_migrations_server_dbtx, Database};
21use fedimint_core::envs::is_running_in_test_env;
22use fedimint_core::epoch::ConsensusItem;
23use fedimint_core::module::registry::ModuleRegistry;
24use fedimint_core::task::TaskGroup;
25use fedimint_core::NumPeers;
26use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
27use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
28use jsonrpsee::server::ServerHandle;
29use tokio::sync::watch;
30use tracing::{info, warn};
31
32use crate::config::{ServerConfig, ServerConfigLocal};
33use crate::consensus::api::ConsensusApi;
34use crate::consensus::engine::ConsensusEngine;
35use crate::envs::{FM_DB_CHECKPOINT_RETENTION_DEFAULT, FM_DB_CHECKPOINT_RETENTION_ENV};
36use crate::net::api::announcement::get_api_urls;
37use crate::net::api::{ApiSecrets, RpcHandlerCtx};
38use crate::{net, update_server_info_version_dbtx};
39
40/// How many txs can be stored in memory before blocking the API
41const TRANSACTION_BUFFER: usize = 1000;
42
43#[allow(clippy::too_many_arguments)]
44pub async fn run(
45    p2p_bind_addr: SocketAddr,
46    api_bind_addr: SocketAddr,
47    cfg: ServerConfig,
48    db: Database,
49    module_init_registry: ServerModuleInitRegistry,
50    task_group: &TaskGroup,
51    force_api_secrets: ApiSecrets,
52    data_dir: PathBuf,
53    code_version_str: String,
54) -> anyhow::Result<()> {
55    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
56
57    let mut global_dbtx = db.begin_transaction().await;
58    apply_migrations_server_dbtx(
59        &mut global_dbtx.to_ref_nc(),
60        "fedimint-server".to_string(),
61        get_global_database_migrations(),
62    )
63    .await?;
64
65    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
66    global_dbtx.commit_tx_result().await?;
67
68    let mut modules = BTreeMap::new();
69
70    // TODO: make it work with all transports and federation secrets
71    let global_api = DynGlobalApi::from_endpoints(
72        cfg.consensus
73            .api_endpoints
74            .iter()
75            .map(|(&peer_id, url)| (peer_id, url.url.clone())),
76        &None,
77        &Connector::Tcp,
78    );
79
80    for (module_id, module_cfg) in &cfg.consensus.modules {
81        match module_init_registry.get(&module_cfg.kind) {
82            Some(module_init) => {
83                info!(target: LOG_CORE, "Initialise module {module_id}");
84
85                apply_migrations(
86                    &db,
87                    module_init.module_kind().to_string(),
88                    module_init.get_database_migrations(),
89                    Some(*module_id),
90                    None,
91                )
92                .await?;
93
94                let module = module_init
95                    .init(
96                        NumPeers::from(cfg.consensus.api_endpoints.len()),
97                        cfg.get_module_config(*module_id)?,
98                        db.with_prefix_module_id(*module_id).0,
99                        task_group,
100                        cfg.local.identity,
101                        global_api.with_module(*module_id),
102                    )
103                    .await?;
104
105                modules.insert(*module_id, (module_cfg.kind.clone(), module));
106            }
107            None => bail!("Detected configuration for unsupported module id: {module_id}"),
108        };
109    }
110
111    let module_registry = ModuleRegistry::from(modules);
112
113    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
114
115    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
116    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
117
118    let mut p2p_status_senders = BTreeMap::new();
119    let mut ci_status_senders = BTreeMap::new();
120    let mut status_receivers = BTreeMap::new();
121
122    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
123        let (p2p_sender, p2p_receiver) = watch::channel(P2PConnectionStatus::Disconnected);
124        let (ci_sender, ci_receiver) = watch::channel(None);
125
126        p2p_status_senders.insert(peer, p2p_sender);
127        ci_status_senders.insert(peer, ci_sender);
128
129        status_receivers.insert(peer, (p2p_receiver, ci_receiver));
130    }
131
132    let consensus_api = ConsensusApi {
133        cfg: cfg.clone(),
134        db: db.clone(),
135        modules: module_registry.clone(),
136        client_cfg: client_cfg.clone(),
137        submission_sender: submission_sender.clone(),
138        shutdown_sender,
139        shutdown_receiver: shutdown_receiver.clone(),
140        supported_api_versions: ServerConfig::supported_api_versions_summary(
141            &cfg.consensus.modules,
142            &module_init_registry,
143        ),
144        status_receivers,
145        force_api_secret: force_api_secrets.get_active(),
146        code_version_str,
147    };
148
149    info!(target: LOG_CONSENSUS, "Starting Consensus Api");
150
151    let api_handler = start_consensus_api(
152        &cfg.local,
153        consensus_api,
154        force_api_secrets.clone(),
155        api_bind_addr,
156    )
157    .await;
158
159    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals");
160
161    for (module_id, kind, module) in module_registry.iter_modules() {
162        submit_module_ci_proposals(
163            task_group,
164            db.clone(),
165            module_id,
166            kind.clone(),
167            module.clone(),
168            submission_sender.clone(),
169        );
170    }
171
172    let checkpoint_retention: String = env::var(FM_DB_CHECKPOINT_RETENTION_ENV)
173        .unwrap_or(FM_DB_CHECKPOINT_RETENTION_DEFAULT.to_string());
174    let checkpoint_retention = checkpoint_retention.parse().unwrap_or_else(|_| {
175        panic!("FM_DB_CHECKPOINT_RETENTION_ENV var is invalid: {checkpoint_retention}")
176    });
177
178    info!(target: LOG_CONSENSUS, "Starting Consensus Engine");
179
180    let api_urls = get_api_urls(&db, &cfg.consensus).await;
181
182    // FIXME: (@leonardo) How should this be handled ?
183    // Using the `Connector::default()` for now!
184    ConsensusEngine {
185        db,
186        federation_api: DynGlobalApi::from_endpoints(
187            api_urls,
188            &force_api_secrets.get_active(),
189            &Connector::default(),
190        ),
191        self_id_str: cfg.local.identity.to_string(),
192        peer_id_str: (0..cfg.consensus.api_endpoints.len())
193            .map(|x| x.to_string())
194            .collect(),
195        cfg: cfg.clone(),
196        p2p_status_senders,
197        ci_status_senders,
198        submission_receiver,
199        shutdown_receiver,
200        modules: module_registry,
201        task_group: task_group.clone(),
202        data_dir,
203        checkpoint_retention,
204        p2p_bind_addr,
205    }
206    .run()
207    .await?;
208
209    api_handler
210        .stop()
211        .expect("Consensus api should still be running");
212
213    api_handler.stopped().await;
214
215    Ok(())
216}
217
218async fn start_consensus_api(
219    cfg: &ServerConfigLocal,
220    api: ConsensusApi,
221    force_api_secrets: ApiSecrets,
222    api_bind: SocketAddr,
223) -> ServerHandle {
224    let mut rpc_module = RpcHandlerCtx::new_module(api.clone());
225
226    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
227
228    for (id, _, module) in api.modules.iter_modules() {
229        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
230    }
231
232    net::api::spawn(
233        "consensus",
234        api_bind,
235        rpc_module,
236        cfg.max_connections,
237        force_api_secrets,
238    )
239    .await
240}
241
242const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
243
244fn submit_module_ci_proposals(
245    task_group: &TaskGroup,
246    db: Database,
247    module_id: ModuleInstanceId,
248    kind: ModuleKind,
249    module: DynServerModule,
250    submission_sender: Sender<ConsensusItem>,
251) {
252    let mut interval = tokio::time::interval(if is_running_in_test_env() {
253        Duration::from_millis(100)
254    } else {
255        Duration::from_secs(1)
256    });
257
258    task_group.spawn(
259        format!("citem_proposals_{module_id}"),
260        move |task_handle| async move {
261            while !task_handle.is_shutting_down() {
262                let module_consensus_items = tokio::time::timeout(
263                    CONSENSUS_PROPOSAL_TIMEOUT,
264                    module.consensus_proposal(
265                        &mut db
266                            .begin_transaction_nc()
267                            .await
268                            .to_ref_with_prefix_module_id(module_id)
269                            .0
270                            .into_nc(),
271                        module_id,
272                    ),
273                )
274                .await;
275
276                match module_consensus_items {
277                    Ok(items) => {
278                        for item in items {
279                            if submission_sender
280                                .send(ConsensusItem::Module(item))
281                                .await
282                                .is_err()
283                            {
284                                warn!(
285                                    target: LOG_CONSENSUS,
286                                    module_id,
287                                    "Unable to submit module consensus item proposal via channel"
288                                );
289                            }
290                        }
291                    }
292                    Err(..) => {
293                        warn!(
294                            target: LOG_CONSENSUS,
295                            module_id,
296                            %kind,
297                            "Module failed to propose consensus items on time"
298                        );
299                    }
300                }
301
302                interval.tick().await;
303            }
304        },
305    );
306}