1use std::collections::BTreeMap;
2use std::fs;
3use std::net::SocketAddr;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use aleph_bft::Keychain as KeychainTrait;
9use anyhow::{anyhow, bail};
10use async_channel::Receiver;
11use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerConnectionStatus};
12use fedimint_api_client::query::FilterMap;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::fmt_utils::OptStacktrace;
19use fedimint_core::module::audit::Audit;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ServerModuleRegistry};
21use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{sleep, TaskGroup, TaskHandle};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::{timing, NumPeers, NumPeersExt, PeerId};
29use futures::StreamExt;
30use rand::Rng;
31use tokio::sync::{watch, RwLock};
32use tracing::{debug, info, instrument, warn, Level};
33
34use crate::config::ServerConfig;
35use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
36use crate::consensus::aleph_bft::data_provider::{get_citem_bytes_chsum, DataProvider, UnitData};
37use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
38use crate::consensus::aleph_bft::keychain::Keychain;
39use crate::consensus::aleph_bft::network::Network;
40use crate::consensus::aleph_bft::spawner::Spawner;
41use crate::consensus::aleph_bft::{to_node_index, Message};
42use crate::consensus::db::{
43 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
44 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
45};
46use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
47use crate::consensus::transaction::process_transaction_with_dbtx;
48use crate::fedimint_core::encoding::Encodable;
49use crate::metrics::{
50 CONSENSUS_ITEMS_PROCESSED_TOTAL, CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
51 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ORDERING_LATENCY_SECONDS,
52 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX, CONSENSUS_SESSION_COUNT,
53};
54use crate::net::connect::{Connector, TlsTcpConnector};
55use crate::net::peers::{DelayCalculator, ReconnectPeerConnections};
56use crate::LOG_CONSENSUS;
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub last_ci_by_peer: Arc<RwLock<BTreeMap<PeerId, u64>>>,
70 pub self_id_str: String,
72 pub peer_id_str: Vec<String>,
74 pub connection_status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
75 pub task_group: TaskGroup,
76 pub data_dir: PathBuf,
77 pub checkpoint_retention: u64,
78 pub p2p_bind_addr: SocketAddr,
79}
80
81impl ConsensusEngine {
82 fn num_peers(&self) -> NumPeers {
83 self.cfg.consensus.broadcast_public_keys.to_num_peers()
84 }
85
86 fn identity(&self) -> PeerId {
87 self.cfg.local.identity
88 }
89
90 #[instrument(name = "run", skip_all, fields(id=%self.cfg.local.identity))]
91 pub async fn run(self) -> anyhow::Result<()> {
92 if self.num_peers().total() == 1 {
93 self.run_single_guardian(self.task_group.make_handle())
94 .await
95 } else {
96 self.run_consensus(self.p2p_bind_addr, self.task_group.make_handle())
97 .await
98 }
99 }
100
101 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
102 assert_eq!(self.num_peers(), NumPeers::from(1));
103
104 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
105
106 while !task_handle.is_shutting_down() {
107 let session_index = self.get_finished_session_count().await;
108
109 CONSENSUS_SESSION_COUNT.set(session_index as i64);
110
111 let mut item_index = self.pending_accepted_items().await.len() as u64;
112
113 let session_start_time = std::time::Instant::now();
114
115 while let Ok(item) = self.submission_receiver.recv().await {
116 if self
117 .process_consensus_item(session_index, item_index, item, self.identity())
118 .await
119 .is_ok()
120 {
121 item_index += 1;
122 }
123
124 if session_start_time.elapsed() > Duration::from_secs(60) {
126 break;
127 }
128 }
129
130 let session_outcome = SessionOutcome {
131 items: self.pending_accepted_items().await,
132 };
133
134 let header = session_outcome.header(session_index);
135 let signature = Keychain::new(&self.cfg).sign(&header);
136 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
137
138 self.complete_session(
139 session_index,
140 SignedSessionOutcome {
141 session_outcome,
142 signatures,
143 },
144 )
145 .await;
146
147 self.checkpoint_database(session_index);
148
149 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
150
151 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
152 break;
153 }
154 }
155
156 info!(target: LOG_CONSENSUS, "Consensus task shut down");
157
158 Ok(())
159 }
160
161 pub async fn run_consensus(
162 &self,
163 p2p_bind_addr: SocketAddr,
164 task_handle: TaskHandle,
165 ) -> anyhow::Result<()> {
166 assert!(self.num_peers().total() >= 4);
168
169 self.confirm_server_config_consensus_hash().await?;
170
171 let connections = ReconnectPeerConnections::new(
173 self.cfg.network_config(p2p_bind_addr),
174 DelayCalculator::PROD_DEFAULT,
175 TlsTcpConnector::new(self.cfg.tls_config(), self.identity()).into_dyn(),
176 &self.task_group,
177 Arc::clone(&self.connection_status_channels),
178 )
179 .await;
180
181 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
182
183 while !task_handle.is_shutting_down() {
184 let session_index = self.get_finished_session_count().await;
185
186 CONSENSUS_SESSION_COUNT.set(session_index as i64);
187
188 self.run_session(connections.clone(), session_index).await?;
189
190 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
191
192 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
193 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
194
195 sleep(Duration::from_secs(60)).await;
196
197 break;
198 }
199 }
200
201 info!(target: LOG_CONSENSUS, "Consensus task shut down");
202
203 Ok(())
204 }
205
206 async fn confirm_server_config_consensus_hash(&self) -> anyhow::Result<()> {
207 let our_hash = self.cfg.consensus.consensus_hash();
208
209 info!(target: LOG_CONSENSUS, "Waiting for peers config {our_hash}");
210
211 loop {
212 match self.federation_api.server_config_consensus_hash().await {
213 Ok(consensus_hash) => {
214 if consensus_hash != our_hash {
215 bail!("Our consensus config doesn't match peers!")
216 }
217
218 info!(target: LOG_CONSENSUS, "Confirmed peers config {our_hash}");
219
220 return Ok(());
221 }
222 Err(e) => {
223 warn!(target: LOG_CONSENSUS, "Could not check consensus config hash: {}", OptStacktrace(e));
224 }
225 }
226
227 sleep(Duration::from_millis(100)).await;
228 }
229 }
230
231 pub async fn run_session(
232 &self,
233 connections: ReconnectPeerConnections<Message>,
234 session_index: u64,
235 ) -> anyhow::Result<()> {
236 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
254 const BASE: f64 = 1.02;
255
256 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
257 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
258
259 let mut delay_config = aleph_bft::default_delay_config();
260
261 delay_config.unit_creation_delay = Arc::new(move |round_index| {
262 let delay = if round_index == 0 {
263 0.0
264 } else {
265 round_delay
266 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
267 * rand::thread_rng().gen_range(0.5..=1.5)
268 };
269
270 Duration::from_millis(delay.round() as u64)
271 });
272
273 let config = aleph_bft::create_config(
274 self.num_peers().total().into(),
275 self.identity().to_usize().into(),
276 session_index,
277 self.cfg
278 .consensus
279 .broadcast_rounds_per_session
280 .checked_add(EXP_SLOWDOWN_ROUNDS)
281 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
282 delay_config,
283 Duration::from_secs(10 * 365 * 24 * 60 * 60),
284 )
285 .expect("The exponential slowdown exceeds 10 years");
286
287 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
290 let (signature_sender, signature_receiver) = watch::channel(None);
291 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
292 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
293
294 let aleph_handle = spawn(
295 "aleph run session",
296 aleph_bft::run_session(
297 config,
298 aleph_bft::LocalIO::new(
299 DataProvider::new(
300 self.submission_receiver.clone(),
301 signature_receiver,
302 timestamp_sender,
303 ),
304 FinalizationHandler::new(unit_data_sender),
305 BackupWriter::new(self.db.clone()).await,
306 BackupReader::new(self.db.clone()),
307 ),
308 Network::new(connections),
309 Keychain::new(&self.cfg),
310 Spawner::new(self.task_group.make_subgroup()),
311 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
312 ),
313 );
314
315 let signed_session_outcome = self
316 .complete_signed_session_outcome(
317 session_index,
318 unit_data_receiver,
319 signature_sender,
320 timestamp_receiver,
321 )
322 .await?;
323
324 terminator_sender.send(()).ok();
327 aleph_handle.await.ok();
328
329 self.complete_session(session_index, signed_session_outcome)
333 .await;
334
335 self.checkpoint_database(session_index);
336
337 Ok(())
338 }
339
340 pub async fn complete_signed_session_outcome(
341 &self,
342 session_index: u64,
343 ordered_unit_receiver: Receiver<OrderedUnit>,
344 signature_sender: watch::Sender<Option<SchnorrSignature>>,
345 timestamp_receiver: Receiver<(Instant, u64)>,
346 ) -> anyhow::Result<SignedSessionOutcome> {
347 let mut item_index = 0;
350
351 let mut request_signed_session_outcome = Box::pin(async {
352 self.request_signed_session_outcome(&self.federation_api, session_index)
353 .await
354 });
355
356 loop {
360 tokio::select! {
361 ordered_unit = ordered_unit_receiver.recv() => {
362 let ordered_unit = ordered_unit?;
363
364 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
365 break;
366 }
367
368 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
369 if ordered_unit.creator == self.identity() {
370 loop {
371 match timestamp_receiver.try_recv() {
372 Ok((timestamp, chsum)) => {
373 if get_citem_bytes_chsum(&bytes) == chsum {
374 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
375 break;
376 }
377 warn!(target: LOG_CONSENSUS, "Not reporting ordering latency on possibly out of sync item");
378 }
379 Err(err) => {
380 debug!(target: LOG_CONSENSUS, %err, "Missing submission timestamp. This is normal on start");
381 break;
382 }
383 }
384 }
385 }
386
387 if let Ok(items) = Vec::<ConsensusItem>::consensus_decode(&mut bytes.as_slice(), &self.decoders()){
388 for item in items {
389 if self.process_consensus_item(
390 session_index,
391 item_index,
392 item.clone(),
393 ordered_unit.creator
394 ).await
395 .is_ok() {
396 item_index += 1;
397 }
398 }
399 }
400 }
401 },
402 signed_session_outcome = &mut request_signed_session_outcome => {
403 let pending_accepted_items = self.pending_accepted_items().await;
404
405 let (processed, unprocessed) = signed_session_outcome
407 .session_outcome
408 .items
409 .split_at(pending_accepted_items.len());
410
411 assert!(
412 processed.iter().eq(pending_accepted_items.iter()),
413 "Consensus Failure: pending accepted items disagree with federation consensus"
414 );
415
416 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
417 if let Err(err) = self.process_consensus_item(
418 session_index,
419 item_index as u64,
420 accepted_item.item.clone(),
421 accepted_item.peer
422 ).await {
423 panic!(
424 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
425 processed.len(),
426 unprocessed.len(),
427 );
428 }
429 }
430
431 return Ok(signed_session_outcome);
432 }
433 }
434 }
435
436 let items = self.pending_accepted_items().await;
437
438 assert_eq!(item_index, items.len() as u64);
439
440 let session_outcome = SessionOutcome { items };
441
442 let header = session_outcome.header(session_index);
443
444 let keychain = Keychain::new(&self.cfg);
445
446 signature_sender.send(Some(keychain.sign(&header)))?;
449
450 let mut signatures = BTreeMap::new();
451
452 let items_dump = tokio::sync::OnceCell::new();
453
454 while signatures.len() < self.num_peers().threshold() {
457 tokio::select! {
458 ordered_unit = ordered_unit_receiver.recv() => {
459 let ordered_unit = ordered_unit?;
460
461 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
462 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
463 signatures.insert(ordered_unit.creator, signature);
464 } else {
465 warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
466
467 items_dump.get_or_init(|| async {
468 for (idx, item) in session_outcome.items.iter().enumerate() {
469 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
470 }
471 }).await;
472 }
473 }
474 }
475 signed_session_outcome = &mut request_signed_session_outcome => {
476 assert_eq!(
477 header,
478 signed_session_outcome.session_outcome.header(session_index),
479 "Consensus Failure: header disagrees with federation consensus"
480 );
481
482 return Ok(signed_session_outcome);
483 }
484 }
485 }
486
487 Ok(SignedSessionOutcome {
488 session_outcome,
489 signatures,
490 })
491 }
492
493 fn decoders(&self) -> ModuleDecoderRegistry {
494 self.modules.decoder_registry()
495 }
496
497 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
498 self.db
499 .begin_transaction_nc()
500 .await
501 .find_by_prefix(&AcceptedItemPrefix)
502 .await
503 .map(|entry| entry.1)
504 .collect()
505 .await
506 }
507
508 pub async fn complete_session(
509 &self,
510 session_index: u64,
511 signed_session_outcome: SignedSessionOutcome,
512 ) {
513 let mut dbtx = self.db.begin_transaction().await;
514
515 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
516
517 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
518
519 if dbtx
520 .insert_entry(
521 &SignedSessionOutcomeKey(session_index),
522 &signed_session_outcome,
523 )
524 .await
525 .is_some()
526 {
527 panic!("We tried to overwrite a signed session outcome");
528 }
529
530 dbtx.commit_tx_result()
531 .await
532 .expect("This is the only place where we write to this key");
533 }
534
535 fn db_checkpoints_dir(&self) -> PathBuf {
537 self.data_dir.join(DB_CHECKPOINTS_DIR)
538 }
539
540 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
544 let checkpoint_dir = self.db_checkpoints_dir();
545
546 if checkpoint_dir.exists() {
547 debug!(
548 ?current_session,
549 "Removing database checkpoints up to `current_session`"
550 );
551
552 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
553 if let Ok(file_name) = checkpoint.file_name().into_string() {
555 if let Ok(session) = file_name.parse::<u64>() {
556 if current_session >= self.checkpoint_retention
557 && session < current_session - self.checkpoint_retention
558 {
559 fs::remove_dir_all(checkpoint.path())?;
560 }
561 }
562 }
563 }
564 } else {
565 fs::create_dir_all(&checkpoint_dir)?;
566 }
567
568 Ok(())
569 }
570
571 fn checkpoint_database(&self, session_index: u64) {
575 if self.checkpoint_retention == 0 {
578 return;
579 }
580
581 let checkpoint_dir = self.db_checkpoints_dir();
582 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
583
584 {
585 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::DEBUG);
586 match self.db.checkpoint(&session_checkpoint_dir) {
587 Ok(()) => {
588 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
589 }
590 Err(e) => {
591 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
592 }
593 }
594 }
595
596 {
597 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::DEBUG);
599 if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
600 warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
601 }
602 }
603 }
604
605 fn delete_old_database_checkpoint(
608 &self,
609 session_index: u64,
610 checkpoint_dir: &Path,
611 ) -> anyhow::Result<()> {
612 if self.checkpoint_retention > session_index {
613 return Ok(());
614 }
615
616 let delete_session_index = session_index - self.checkpoint_retention;
617 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
618 if checkpoint_to_delete.exists() {
619 fs::remove_dir_all(checkpoint_to_delete)?;
620 }
621
622 Ok(())
623 }
624
625 #[instrument(target = "fm::consensus", skip(self, item), level = "info")]
626 pub async fn process_consensus_item(
627 &self,
628 session_index: u64,
629 item_index: u64,
630 item: ConsensusItem,
631 peer: PeerId,
632 ) -> anyhow::Result<()> {
633 let peer_id_str = &self.peer_id_str[peer.to_usize()];
634 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
635 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
636 .with_label_values(&[peer_id_str])
637 .start_timer();
638
639 debug!(%peer, item = ?DebugConsensusItem(&item), "Processing consensus item");
640
641 self.last_ci_by_peer
642 .write()
643 .await
644 .insert(peer, session_index);
645
646 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
647 .with_label_values(&[&self.self_id_str, peer_id_str])
648 .set(session_index as i64);
649
650 let mut dbtx = self.db.begin_transaction().await;
651
652 dbtx.ignore_uncommitted();
653
654 if let Some(existing_item) = dbtx
658 .get_value(&AcceptedItemKey(item_index.to_owned()))
659 .await
660 {
661 if existing_item.item == item && existing_item.peer == peer {
662 return Ok(());
663 }
664
665 bail!("Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}", existing_item.peer);
666 }
667
668 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
669 .await?;
670
671 dbtx.warn_uncommitted();
674
675 dbtx.insert_entry(&AcceptedItemKey(item_index), &AcceptedItem { item, peer })
676 .await;
677
678 let mut audit = Audit::default();
679
680 for (module_instance_id, kind, module) in self.modules.iter_modules() {
681 let _module_audit_timing =
682 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
683
684 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
685 .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
686 .start_timer();
687
688 module
689 .audit(
690 &mut dbtx
691 .to_ref_with_prefix_module_id(module_instance_id)
692 .0
693 .into_nc(),
694 &mut audit,
695 module_instance_id,
696 )
697 .await;
698 timing_prom.observe_duration();
699 }
700
701 assert!(
702 audit
703 .net_assets()
704 .expect("Overflow while checking balance sheet")
705 .milli_sat
706 >= 0,
707 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
708 );
709
710 dbtx.commit_tx_result()
711 .await
712 .expect("Committing consensus epoch failed");
713
714 CONSENSUS_ITEMS_PROCESSED_TOTAL
715 .with_label_values(&[&self.peer_id_str[peer.to_usize()]])
716 .inc();
717 timing_prom.observe_duration();
718
719 Ok(())
720 }
721
722 async fn process_consensus_item_with_db_transaction(
723 &self,
724 dbtx: &mut DatabaseTransaction<'_>,
725 consensus_item: ConsensusItem,
726 peer_id: PeerId,
727 ) -> anyhow::Result<()> {
728 self.decoders().assert_reject_mode();
731
732 match consensus_item {
733 ConsensusItem::Module(module_item) => {
734 let instance_id = module_item.module_instance_id();
735
736 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
737
738 self.modules
739 .get_expect(instance_id)
740 .process_consensus_item(module_dbtx, &module_item, peer_id)
741 .await
742 }
743 ConsensusItem::Transaction(transaction) => {
744 let txid = transaction.tx_hash();
745 if dbtx
746 .get_value(&AcceptedTransactionKey(txid))
747 .await
748 .is_some()
749 {
750 debug!(target: LOG_CONSENSUS, %txid, "Transaction already accepted");
751 bail!("Transaction is already accepted");
752 }
753
754 let modules_ids = transaction
755 .outputs
756 .iter()
757 .map(DynOutput::module_instance_id)
758 .collect::<Vec<_>>();
759
760 process_transaction_with_dbtx(self.modules.clone(), dbtx, &transaction)
761 .await
762 .map_err(|error| anyhow!(error.to_string()))?;
763
764 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
765 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
766 .await;
767
768 Ok(())
769 }
770 ConsensusItem::Default { variant, .. } => {
771 warn!(
772 target: LOG_CONSENSUS,
773 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
774 );
775
776 panic!("Unexpected consensus item type: {variant}")
777 }
778 }
779 }
780
781 async fn request_signed_session_outcome(
782 &self,
783 federation_api: &DynGlobalApi,
784 index: u64,
785 ) -> SignedSessionOutcome {
786 let decoders = self.decoders();
787 let keychain = Keychain::new(&self.cfg);
788 let threshold = self.num_peers().threshold();
789
790 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| match response
791 .try_into_inner(&decoders)
792 {
793 Ok(signed_session_outcome) => {
794 if signed_session_outcome.signatures.len() == threshold
795 && signed_session_outcome
796 .signatures
797 .iter()
798 .all(|(peer_id, sig)| {
799 keychain.verify(
800 &signed_session_outcome.session_outcome.header(index),
801 sig,
802 to_node_index(*peer_id),
803 )
804 })
805 {
806 Ok(signed_session_outcome)
807 } else {
808 Err(anyhow!("Invalid signatures"))
809 }
810 }
811 Err(error) => Err(anyhow!(error.to_string())),
812 };
813
814 loop {
815 let result = federation_api
816 .request_with_strategy(
817 FilterMap::new(filter_map.clone(), self.num_peers()),
818 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
819 ApiRequestErased::new(index),
820 )
821 .await;
822
823 match result {
824 Ok(signed_session_outcome) => return signed_session_outcome,
825 Err(err) => {
826 if !err.to_string().contains("Rpc error: Request timeout") {
828 warn!(target: LOG_CONSENSUS, %err, "Error while requesting signed session outcome");
829 }
830 }
831 }
832 }
833 }
834
835 async fn get_finished_session_count(&self) -> u64 {
838 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
839 }
840}
841
842pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
843 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
844 .await
845 .next()
846 .await
847 .map_or(0, |entry| (entry.0 .0) + 1)
848}