fedimint_server/consensus/
mod.rs1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::env;
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::get_global_database_migrations;
17use fedimint_api_client::api::net::Connector;
18use fedimint_api_client::api::{DynGlobalApi, P2PConnectionStatus};
19use fedimint_core::core::{ModuleInstanceId, ModuleKind};
20use fedimint_core::db::{apply_migrations, apply_migrations_server_dbtx, Database};
21use fedimint_core::envs::is_running_in_test_env;
22use fedimint_core::epoch::ConsensusItem;
23use fedimint_core::module::registry::ModuleRegistry;
24use fedimint_core::task::TaskGroup;
25use fedimint_core::NumPeers;
26use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
27use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
28use jsonrpsee::server::ServerHandle;
29use tokio::sync::watch;
30use tracing::{info, warn};
31
32use crate::config::{ServerConfig, ServerConfigLocal};
33use crate::consensus::api::ConsensusApi;
34use crate::consensus::engine::ConsensusEngine;
35use crate::envs::{FM_DB_CHECKPOINT_RETENTION_DEFAULT, FM_DB_CHECKPOINT_RETENTION_ENV};
36use crate::net::api::announcement::get_api_urls;
37use crate::net::api::{ApiSecrets, RpcHandlerCtx};
38use crate::{net, update_server_info_version_dbtx};
39
40const TRANSACTION_BUFFER: usize = 1000;
42
43#[allow(clippy::too_many_arguments)]
44pub async fn run(
45 p2p_bind_addr: SocketAddr,
46 api_bind_addr: SocketAddr,
47 cfg: ServerConfig,
48 db: Database,
49 module_init_registry: ServerModuleInitRegistry,
50 task_group: &TaskGroup,
51 force_api_secrets: ApiSecrets,
52 data_dir: PathBuf,
53 code_version_str: String,
54) -> anyhow::Result<()> {
55 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
56
57 let mut global_dbtx = db.begin_transaction().await;
58 apply_migrations_server_dbtx(
59 &mut global_dbtx.to_ref_nc(),
60 "fedimint-server".to_string(),
61 get_global_database_migrations(),
62 )
63 .await?;
64
65 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
66 global_dbtx.commit_tx_result().await?;
67
68 let mut modules = BTreeMap::new();
69
70 let global_api = DynGlobalApi::from_endpoints(
72 cfg.consensus
73 .api_endpoints
74 .iter()
75 .map(|(&peer_id, url)| (peer_id, url.url.clone())),
76 &None,
77 &Connector::Tcp,
78 );
79
80 for (module_id, module_cfg) in &cfg.consensus.modules {
81 match module_init_registry.get(&module_cfg.kind) {
82 Some(module_init) => {
83 info!(target: LOG_CORE, "Initialise module {module_id}");
84
85 apply_migrations(
86 &db,
87 module_init.module_kind().to_string(),
88 module_init.get_database_migrations(),
89 Some(*module_id),
90 None,
91 )
92 .await?;
93
94 let module = module_init
95 .init(
96 NumPeers::from(cfg.consensus.api_endpoints.len()),
97 cfg.get_module_config(*module_id)?,
98 db.with_prefix_module_id(*module_id).0,
99 task_group,
100 cfg.local.identity,
101 global_api.with_module(*module_id),
102 )
103 .await?;
104
105 modules.insert(*module_id, (module_cfg.kind.clone(), module));
106 }
107 None => bail!("Detected configuration for unsupported module id: {module_id}"),
108 };
109 }
110
111 let module_registry = ModuleRegistry::from(modules);
112
113 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
114
115 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
116 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
117
118 let mut p2p_status_senders = BTreeMap::new();
119 let mut ci_status_senders = BTreeMap::new();
120 let mut status_receivers = BTreeMap::new();
121
122 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
123 let (p2p_sender, p2p_receiver) = watch::channel(P2PConnectionStatus::Disconnected);
124 let (ci_sender, ci_receiver) = watch::channel(None);
125
126 p2p_status_senders.insert(peer, p2p_sender);
127 ci_status_senders.insert(peer, ci_sender);
128
129 status_receivers.insert(peer, (p2p_receiver, ci_receiver));
130 }
131
132 let consensus_api = ConsensusApi {
133 cfg: cfg.clone(),
134 db: db.clone(),
135 modules: module_registry.clone(),
136 client_cfg: client_cfg.clone(),
137 submission_sender: submission_sender.clone(),
138 shutdown_sender,
139 shutdown_receiver: shutdown_receiver.clone(),
140 supported_api_versions: ServerConfig::supported_api_versions_summary(
141 &cfg.consensus.modules,
142 &module_init_registry,
143 ),
144 status_receivers,
145 force_api_secret: force_api_secrets.get_active(),
146 code_version_str,
147 };
148
149 info!(target: LOG_CONSENSUS, "Starting Consensus Api");
150
151 let api_handler = start_consensus_api(
152 &cfg.local,
153 consensus_api,
154 force_api_secrets.clone(),
155 api_bind_addr,
156 )
157 .await;
158
159 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals");
160
161 for (module_id, kind, module) in module_registry.iter_modules() {
162 submit_module_ci_proposals(
163 task_group,
164 db.clone(),
165 module_id,
166 kind.clone(),
167 module.clone(),
168 submission_sender.clone(),
169 );
170 }
171
172 let checkpoint_retention: String = env::var(FM_DB_CHECKPOINT_RETENTION_ENV)
173 .unwrap_or(FM_DB_CHECKPOINT_RETENTION_DEFAULT.to_string());
174 let checkpoint_retention = checkpoint_retention.parse().unwrap_or_else(|_| {
175 panic!("FM_DB_CHECKPOINT_RETENTION_ENV var is invalid: {checkpoint_retention}")
176 });
177
178 info!(target: LOG_CONSENSUS, "Starting Consensus Engine");
179
180 let api_urls = get_api_urls(&db, &cfg.consensus).await;
181
182 ConsensusEngine {
185 db,
186 federation_api: DynGlobalApi::from_endpoints(
187 api_urls,
188 &force_api_secrets.get_active(),
189 &Connector::default(),
190 ),
191 self_id_str: cfg.local.identity.to_string(),
192 peer_id_str: (0..cfg.consensus.api_endpoints.len())
193 .map(|x| x.to_string())
194 .collect(),
195 cfg: cfg.clone(),
196 p2p_status_senders,
197 ci_status_senders,
198 submission_receiver,
199 shutdown_receiver,
200 modules: module_registry,
201 task_group: task_group.clone(),
202 data_dir,
203 checkpoint_retention,
204 p2p_bind_addr,
205 }
206 .run()
207 .await?;
208
209 api_handler
210 .stop()
211 .expect("Consensus api should still be running");
212
213 api_handler.stopped().await;
214
215 Ok(())
216}
217
218async fn start_consensus_api(
219 cfg: &ServerConfigLocal,
220 api: ConsensusApi,
221 force_api_secrets: ApiSecrets,
222 api_bind: SocketAddr,
223) -> ServerHandle {
224 let mut rpc_module = RpcHandlerCtx::new_module(api.clone());
225
226 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
227
228 for (id, _, module) in api.modules.iter_modules() {
229 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
230 }
231
232 net::api::spawn(
233 "consensus",
234 api_bind,
235 rpc_module,
236 cfg.max_connections,
237 force_api_secrets,
238 )
239 .await
240}
241
242const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
243
244fn submit_module_ci_proposals(
245 task_group: &TaskGroup,
246 db: Database,
247 module_id: ModuleInstanceId,
248 kind: ModuleKind,
249 module: DynServerModule,
250 submission_sender: Sender<ConsensusItem>,
251) {
252 let mut interval = tokio::time::interval(if is_running_in_test_env() {
253 Duration::from_millis(100)
254 } else {
255 Duration::from_secs(1)
256 });
257
258 task_group.spawn(
259 format!("citem_proposals_{module_id}"),
260 move |task_handle| async move {
261 while !task_handle.is_shutting_down() {
262 let module_consensus_items = tokio::time::timeout(
263 CONSENSUS_PROPOSAL_TIMEOUT,
264 module.consensus_proposal(
265 &mut db
266 .begin_transaction_nc()
267 .await
268 .to_ref_with_prefix_module_id(module_id)
269 .0
270 .into_nc(),
271 module_id,
272 ),
273 )
274 .await;
275
276 match module_consensus_items {
277 Ok(items) => {
278 for item in items {
279 if submission_sender
280 .send(ConsensusItem::Module(item))
281 .await
282 .is_err()
283 {
284 warn!(
285 target: LOG_CONSENSUS,
286 module_id,
287 "Unable to submit module consensus item proposal via channel"
288 );
289 }
290 }
291 }
292 Err(..) => {
293 warn!(
294 target: LOG_CONSENSUS,
295 module_id,
296 %kind,
297 "Module failed to propose consensus items on time"
298 );
299 }
300 }
301
302 interval.tick().await;
303 }
304 },
305 );
306}