fedimint_server/consensus/
mod.rs1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::env;
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::bail;
16use async_channel::Sender;
17use db::get_global_database_migrations;
18use fedimint_api_client::api::net::Connector;
19use fedimint_api_client::api::DynGlobalApi;
20use fedimint_core::config::ServerModuleInitRegistry;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{apply_migrations, apply_migrations_server, Database};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::server::DynServerModule;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::NumPeers;
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
30use jsonrpsee::server::ServerHandle;
31use tokio::sync::{watch, RwLock};
32use tracing::info;
33use tracing::log::warn;
34
35use crate::config::{ServerConfig, ServerConfigLocal};
36use crate::consensus::api::ConsensusApi;
37use crate::consensus::engine::ConsensusEngine;
38use crate::envs::{FM_DB_CHECKPOINT_RETENTION_DEFAULT, FM_DB_CHECKPOINT_RETENTION_ENV};
39use crate::net;
40use crate::net::api::announcement::get_api_urls;
41use crate::net::api::{ApiSecrets, RpcHandlerCtx};
42
43const TRANSACTION_BUFFER: usize = 1000;
45
46#[allow(clippy::too_many_arguments)]
47pub async fn run(
48 p2p_bind_addr: SocketAddr,
49 api_bind_addr: SocketAddr,
50 cfg: ServerConfig,
51 db: Database,
52 module_init_registry: ServerModuleInitRegistry,
53 task_group: &TaskGroup,
54 force_api_secrets: ApiSecrets,
55 data_dir: PathBuf,
56 code_version_str: String,
57) -> anyhow::Result<()> {
58 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
59
60 apply_migrations_server(
61 &db,
62 "fedimint-server".to_string(),
63 get_global_database_migrations(),
64 )
65 .await?;
66
67 let mut modules = BTreeMap::new();
68
69 for (module_id, module_cfg) in &cfg.consensus.modules {
70 match module_init_registry.get(&module_cfg.kind) {
71 Some(module_init) => {
72 info!(target: LOG_CORE, "Initialise module {module_id}");
73
74 apply_migrations(
75 &db,
76 module_init.module_kind().to_string(),
77 module_init.get_database_migrations(),
78 Some(*module_id),
79 None,
80 )
81 .await?;
82
83 let module = module_init
84 .init(
85 NumPeers::from(cfg.consensus.api_endpoints.len()),
86 cfg.get_module_config(*module_id)?,
87 db.with_prefix_module_id(*module_id).0,
88 task_group,
89 cfg.local.identity,
90 )
91 .await?;
92
93 modules.insert(*module_id, (module_cfg.kind.clone(), module));
94 }
95 None => bail!("Detected configuration for unsupported module id: {module_id}"),
96 };
97 }
98
99 let module_registry = ModuleRegistry::from(modules);
100
101 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
102
103 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
104 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
105 let connection_status_channels = Arc::new(RwLock::new(BTreeMap::new()));
106 let last_ci_by_peer = Arc::new(RwLock::new(BTreeMap::new()));
107
108 let consensus_api = ConsensusApi {
109 cfg: cfg.clone(),
110 db: db.clone(),
111 modules: module_registry.clone(),
112 client_cfg: client_cfg.clone(),
113 submission_sender: submission_sender.clone(),
114 shutdown_sender,
115 shutdown_receiver: shutdown_receiver.clone(),
116 supported_api_versions: ServerConfig::supported_api_versions_summary(
117 &cfg.consensus.modules,
118 &module_init_registry,
119 ),
120 last_ci_by_peer: last_ci_by_peer.clone(),
121 connection_status_channels: connection_status_channels.clone(),
122 force_api_secret: force_api_secrets.get_active(),
123 code_version_str,
124 };
125
126 info!(target: LOG_CONSENSUS, "Starting Consensus Api");
127
128 let api_handler = start_consensus_api(
129 &cfg.local,
130 consensus_api,
131 force_api_secrets.clone(),
132 api_bind_addr,
133 )
134 .await;
135
136 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals");
137
138 for (module_id, kind, module) in module_registry.iter_modules() {
139 submit_module_ci_proposals(
140 task_group,
141 db.clone(),
142 module_id,
143 kind.clone(),
144 module.clone(),
145 submission_sender.clone(),
146 );
147 }
148
149 let checkpoint_retention: String = env::var(FM_DB_CHECKPOINT_RETENTION_ENV)
150 .unwrap_or(FM_DB_CHECKPOINT_RETENTION_DEFAULT.to_string());
151 let checkpoint_retention = checkpoint_retention.parse().unwrap_or_else(|_| {
152 panic!("FM_DB_CHECKPOINT_RETENTION_ENV var is invalid: {checkpoint_retention}")
153 });
154
155 info!(target: LOG_CONSENSUS, "Starting Consensus Engine");
156
157 let api_urls = get_api_urls(&db, &cfg.consensus).await;
158
159 ConsensusEngine {
162 db,
163 federation_api: DynGlobalApi::from_endpoints(
164 api_urls,
165 &force_api_secrets.get_active(),
166 &Connector::default(),
167 ),
168 self_id_str: cfg.local.identity.to_string(),
169 peer_id_str: (0..cfg.consensus.api_endpoints.len())
170 .map(|x| x.to_string())
171 .collect(),
172 cfg: cfg.clone(),
173 connection_status_channels,
174 submission_receiver,
175 shutdown_receiver,
176 last_ci_by_peer,
177 modules: module_registry,
178 task_group: task_group.clone(),
179 data_dir,
180 checkpoint_retention,
181 p2p_bind_addr,
182 }
183 .run()
184 .await?;
185
186 api_handler
187 .stop()
188 .expect("Consensus api should still be running");
189
190 api_handler.stopped().await;
191
192 Ok(())
193}
194
195async fn start_consensus_api(
196 cfg: &ServerConfigLocal,
197 api: ConsensusApi,
198 force_api_secrets: ApiSecrets,
199 api_bind: SocketAddr,
200) -> ServerHandle {
201 let mut rpc_module = RpcHandlerCtx::new_module(api.clone());
202
203 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
204
205 for (id, _, module) in api.modules.iter_modules() {
206 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
207 }
208
209 net::api::spawn(
210 "consensus",
211 api_bind,
212 rpc_module,
213 cfg.max_connections,
214 force_api_secrets,
215 )
216 .await
217}
218
219const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
220
221fn submit_module_ci_proposals(
222 task_group: &TaskGroup,
223 db: Database,
224 module_id: ModuleInstanceId,
225 kind: ModuleKind,
226 module: DynServerModule,
227 submission_sender: Sender<ConsensusItem>,
228) {
229 let mut interval = tokio::time::interval(if is_running_in_test_env() {
230 Duration::from_millis(100)
231 } else {
232 Duration::from_secs(1)
233 });
234
235 task_group.spawn(
236 "submit_module_ci_proposals_{module_id}",
237 move |task_handle| async move {
238 while !task_handle.is_shutting_down() {
239 let module_consensus_items = tokio::time::timeout(
240 CONSENSUS_PROPOSAL_TIMEOUT,
241 module.consensus_proposal(
242 &mut db
243 .begin_transaction_nc()
244 .await
245 .to_ref_with_prefix_module_id(module_id).0
246 .into_nc(),
247 module_id,
248 ),
249 )
250 .await;
251
252 match module_consensus_items {
253 Ok(items) => {
254 for item in items {
255 submission_sender
256 .send(ConsensusItem::Module(item))
257 .await
258 .ok();
259 }
260 }
261 Err(..) => {
262 warn!(
263 target: LOG_CONSENSUS,
264 "Module {module_id} of kind {kind} failed to propose consensus items on time"
265 );
266 }
267 }
268
269 interval.tick().await;
270 }
271 },
272 );
273}