fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::env;
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::bail;
16use async_channel::Sender;
17use db::get_global_database_migrations;
18use fedimint_api_client::api::net::Connector;
19use fedimint_api_client::api::DynGlobalApi;
20use fedimint_core::config::ServerModuleInitRegistry;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{apply_migrations, apply_migrations_server, Database};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::server::DynServerModule;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::NumPeers;
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
30use jsonrpsee::server::ServerHandle;
31use tokio::sync::{watch, RwLock};
32use tracing::info;
33use tracing::log::warn;
34
35use crate::config::{ServerConfig, ServerConfigLocal};
36use crate::consensus::api::ConsensusApi;
37use crate::consensus::engine::ConsensusEngine;
38use crate::envs::{FM_DB_CHECKPOINT_RETENTION_DEFAULT, FM_DB_CHECKPOINT_RETENTION_ENV};
39use crate::net;
40use crate::net::api::announcement::get_api_urls;
41use crate::net::api::{ApiSecrets, RpcHandlerCtx};
42
43/// How many txs can be stored in memory before blocking the API
44const TRANSACTION_BUFFER: usize = 1000;
45
46#[allow(clippy::too_many_arguments)]
47pub async fn run(
48    p2p_bind_addr: SocketAddr,
49    api_bind_addr: SocketAddr,
50    cfg: ServerConfig,
51    db: Database,
52    module_init_registry: ServerModuleInitRegistry,
53    task_group: &TaskGroup,
54    force_api_secrets: ApiSecrets,
55    data_dir: PathBuf,
56    code_version_str: String,
57) -> anyhow::Result<()> {
58    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
59
60    apply_migrations_server(
61        &db,
62        "fedimint-server".to_string(),
63        get_global_database_migrations(),
64    )
65    .await?;
66
67    let mut modules = BTreeMap::new();
68
69    for (module_id, module_cfg) in &cfg.consensus.modules {
70        match module_init_registry.get(&module_cfg.kind) {
71            Some(module_init) => {
72                info!(target: LOG_CORE, "Initialise module {module_id}");
73
74                apply_migrations(
75                    &db,
76                    module_init.module_kind().to_string(),
77                    module_init.get_database_migrations(),
78                    Some(*module_id),
79                    None,
80                )
81                .await?;
82
83                let module = module_init
84                    .init(
85                        NumPeers::from(cfg.consensus.api_endpoints.len()),
86                        cfg.get_module_config(*module_id)?,
87                        db.with_prefix_module_id(*module_id).0,
88                        task_group,
89                        cfg.local.identity,
90                    )
91                    .await?;
92
93                modules.insert(*module_id, (module_cfg.kind.clone(), module));
94            }
95            None => bail!("Detected configuration for unsupported module id: {module_id}"),
96        };
97    }
98
99    let module_registry = ModuleRegistry::from(modules);
100
101    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
102
103    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
104    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
105    let connection_status_channels = Arc::new(RwLock::new(BTreeMap::new()));
106    let last_ci_by_peer = Arc::new(RwLock::new(BTreeMap::new()));
107
108    let consensus_api = ConsensusApi {
109        cfg: cfg.clone(),
110        db: db.clone(),
111        modules: module_registry.clone(),
112        client_cfg: client_cfg.clone(),
113        submission_sender: submission_sender.clone(),
114        shutdown_sender,
115        shutdown_receiver: shutdown_receiver.clone(),
116        supported_api_versions: ServerConfig::supported_api_versions_summary(
117            &cfg.consensus.modules,
118            &module_init_registry,
119        ),
120        last_ci_by_peer: last_ci_by_peer.clone(),
121        connection_status_channels: connection_status_channels.clone(),
122        force_api_secret: force_api_secrets.get_active(),
123        code_version_str,
124    };
125
126    info!(target: LOG_CONSENSUS, "Starting Consensus Api");
127
128    let api_handler = start_consensus_api(
129        &cfg.local,
130        consensus_api,
131        force_api_secrets.clone(),
132        api_bind_addr,
133    )
134    .await;
135
136    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals");
137
138    for (module_id, kind, module) in module_registry.iter_modules() {
139        submit_module_ci_proposals(
140            task_group,
141            db.clone(),
142            module_id,
143            kind.clone(),
144            module.clone(),
145            submission_sender.clone(),
146        );
147    }
148
149    let checkpoint_retention: String = env::var(FM_DB_CHECKPOINT_RETENTION_ENV)
150        .unwrap_or(FM_DB_CHECKPOINT_RETENTION_DEFAULT.to_string());
151    let checkpoint_retention = checkpoint_retention.parse().unwrap_or_else(|_| {
152        panic!("FM_DB_CHECKPOINT_RETENTION_ENV var is invalid: {checkpoint_retention}")
153    });
154
155    info!(target: LOG_CONSENSUS, "Starting Consensus Engine");
156
157    let api_urls = get_api_urls(&db, &cfg.consensus).await;
158
159    // FIXME: (@leonardo) How should this be handled ?
160    // Using the `Connector::default()` for now!
161    ConsensusEngine {
162        db,
163        federation_api: DynGlobalApi::from_endpoints(
164            api_urls,
165            &force_api_secrets.get_active(),
166            &Connector::default(),
167        ),
168        self_id_str: cfg.local.identity.to_string(),
169        peer_id_str: (0..cfg.consensus.api_endpoints.len())
170            .map(|x| x.to_string())
171            .collect(),
172        cfg: cfg.clone(),
173        connection_status_channels,
174        submission_receiver,
175        shutdown_receiver,
176        last_ci_by_peer,
177        modules: module_registry,
178        task_group: task_group.clone(),
179        data_dir,
180        checkpoint_retention,
181        p2p_bind_addr,
182    }
183    .run()
184    .await?;
185
186    api_handler
187        .stop()
188        .expect("Consensus api should still be running");
189
190    api_handler.stopped().await;
191
192    Ok(())
193}
194
195async fn start_consensus_api(
196    cfg: &ServerConfigLocal,
197    api: ConsensusApi,
198    force_api_secrets: ApiSecrets,
199    api_bind: SocketAddr,
200) -> ServerHandle {
201    let mut rpc_module = RpcHandlerCtx::new_module(api.clone());
202
203    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
204
205    for (id, _, module) in api.modules.iter_modules() {
206        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
207    }
208
209    net::api::spawn(
210        "consensus",
211        api_bind,
212        rpc_module,
213        cfg.max_connections,
214        force_api_secrets,
215    )
216    .await
217}
218
219const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
220
221fn submit_module_ci_proposals(
222    task_group: &TaskGroup,
223    db: Database,
224    module_id: ModuleInstanceId,
225    kind: ModuleKind,
226    module: DynServerModule,
227    submission_sender: Sender<ConsensusItem>,
228) {
229    let mut interval = tokio::time::interval(if is_running_in_test_env() {
230        Duration::from_millis(100)
231    } else {
232        Duration::from_secs(1)
233    });
234
235    task_group.spawn(
236        "submit_module_ci_proposals_{module_id}",
237        move |task_handle| async move {
238            while !task_handle.is_shutting_down() {
239                let module_consensus_items = tokio::time::timeout(
240                    CONSENSUS_PROPOSAL_TIMEOUT,
241                    module.consensus_proposal(
242                        &mut db
243                            .begin_transaction_nc()
244                            .await
245                            .to_ref_with_prefix_module_id(module_id).0
246                            .into_nc(),
247                        module_id,
248                    ),
249                )
250                .await;
251
252                match module_consensus_items {
253                    Ok(items) => {
254                        for item in items {
255                            submission_sender
256                                .send(ConsensusItem::Module(item))
257                                .await
258                                .ok();
259                        }
260                    }
261                    Err(..) => {
262                        warn!(
263                            target: LOG_CONSENSUS,
264                            "Module {module_id} of kind {kind} failed to propose consensus items on time"
265                        );
266                    }
267                }
268
269                interval.tick().await;
270            }
271        },
272    );
273}