1use std::collections::BTreeMap;
2use std::fs;
3use std::net::SocketAddr;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use aleph_bft::Keychain as KeychainTrait;
9use anyhow::{anyhow, bail};
10use async_channel::Receiver;
11use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, P2PConnectionStatus, PeerError};
12use fedimint_api_client::query::FilterMap;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::fmt_utils::OptStacktrace;
19use fedimint_core::module::audit::Audit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
22use fedimint_core::net::peers::{DynP2PConnections, IP2PConnections};
23use fedimint_core::runtime::spawn;
24use fedimint_core::session_outcome::{
25 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
26};
27use fedimint_core::task::{sleep, TaskGroup, TaskHandle};
28use fedimint_core::timing::TimeReporter;
29use fedimint_core::util::FmtCompact as _;
30use fedimint_core::{timing, NumPeers, NumPeersExt, PeerId};
31use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
32use futures::StreamExt;
33use rand::Rng;
34use tokio::sync::watch;
35use tracing::{debug, info, instrument, trace, warn, Level};
36
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{get_citem_bytes_chsum, DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::{to_node_index, Message};
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{process_transaction_with_dbtx, TxProcessingMode};
51use crate::fedimint_core::encoding::Encodable;
52use crate::metrics::{
53 CONSENSUS_ITEMS_PROCESSED_TOTAL, CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
54 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ORDERING_LATENCY_SECONDS,
55 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX, CONSENSUS_SESSION_COUNT,
56};
57use crate::net::p2p::ReconnectP2PConnections;
58use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
59use crate::LOG_CONSENSUS;
60
61const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
63
64pub struct ConsensusEngine {
66 pub modules: ServerModuleRegistry,
67 pub db: Database,
68 pub federation_api: DynGlobalApi,
69 pub cfg: ServerConfig,
70 pub submission_receiver: Receiver<ConsensusItem>,
71 pub shutdown_receiver: watch::Receiver<Option<u64>>,
72 pub p2p_status_senders: BTreeMap<PeerId, watch::Sender<P2PConnectionStatus>>,
73 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
74 pub self_id_str: String,
76 pub peer_id_str: Vec<String>,
78 pub task_group: TaskGroup,
79 pub data_dir: PathBuf,
80 pub checkpoint_retention: u64,
81 pub p2p_bind_addr: SocketAddr,
82}
83
84impl ConsensusEngine {
85 fn num_peers(&self) -> NumPeers {
86 self.cfg.consensus.broadcast_public_keys.to_num_peers()
87 }
88
89 fn identity(&self) -> PeerId {
90 self.cfg.local.identity
91 }
92
93 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
94 pub async fn run(self) -> anyhow::Result<()> {
95 if self.num_peers().total() == 1 {
96 self.run_single_guardian(self.task_group.make_handle())
97 .await
98 } else {
99 self.run_consensus(self.p2p_bind_addr, self.task_group.make_handle())
100 .await
101 }
102 }
103
104 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
105 assert_eq!(self.num_peers(), NumPeers::from(1));
106
107 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
108
109 while !task_handle.is_shutting_down() {
110 let session_index = self.get_finished_session_count().await;
111
112 CONSENSUS_SESSION_COUNT.set(session_index as i64);
113
114 let mut item_index = self.pending_accepted_items().await.len() as u64;
115
116 let session_start_time = std::time::Instant::now();
117
118 while let Ok(item) = self.submission_receiver.recv().await {
119 if self
120 .process_consensus_item(session_index, item_index, item, self.identity())
121 .await
122 .is_ok()
123 {
124 item_index += 1;
125 }
126
127 if session_start_time.elapsed() > Duration::from_secs(60) {
129 break;
130 }
131 }
132
133 let session_outcome = SessionOutcome {
134 items: self.pending_accepted_items().await,
135 };
136
137 let header = session_outcome.header(session_index);
138 let signature = Keychain::new(&self.cfg).sign(&header);
139 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
140
141 self.complete_session(
142 session_index,
143 SignedSessionOutcome {
144 session_outcome,
145 signatures,
146 },
147 )
148 .await;
149
150 self.checkpoint_database(session_index);
151
152 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
153
154 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
155 break;
156 }
157 }
158
159 info!(target: LOG_CONSENSUS, "Consensus task shut down");
160
161 Ok(())
162 }
163
164 pub async fn run_consensus(
165 &self,
166 p2p_bind_addr: SocketAddr,
167 task_handle: TaskHandle,
168 ) -> anyhow::Result<()> {
169 assert!(self.num_peers().total() >= 4);
171
172 self.confirm_server_config_consensus_hash().await?;
173
174 let connector = TlsTcpConnector::new(
176 self.cfg.tls_config(),
177 p2p_bind_addr,
178 self.cfg
179 .local
180 .p2p_endpoints
181 .iter()
182 .map(|(&id, endpoint)| (id, endpoint.url.clone()))
183 .collect(),
184 self.identity(),
185 )
186 .into_dyn();
187
188 let connections = ReconnectP2PConnections::new(
189 self.cfg.local.identity,
190 connector,
191 &self.task_group,
192 Some(self.p2p_status_senders.clone()),
193 )
194 .await
195 .into_dyn();
196
197 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
198
199 while !task_handle.is_shutting_down() {
200 let session_index = self.get_finished_session_count().await;
201
202 CONSENSUS_SESSION_COUNT.set(session_index as i64);
203
204 info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
205
206 self.run_session(connections.clone(), session_index).await?;
207
208 info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
209
210 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
211 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
212
213 sleep(Duration::from_secs(60)).await;
214
215 break;
216 }
217 }
218
219 info!(target: LOG_CONSENSUS, "Consensus task shut down");
220
221 Ok(())
222 }
223
224 async fn confirm_server_config_consensus_hash(&self) -> anyhow::Result<()> {
225 let our_hash = self.cfg.consensus.consensus_hash();
226
227 info!(target: LOG_CONSENSUS, "Waiting for peers config {our_hash}");
228
229 loop {
230 match self.federation_api.server_config_consensus_hash().await {
231 Ok(consensus_hash) => {
232 if consensus_hash != our_hash {
233 bail!("Our consensus config doesn't match peers!")
234 }
235
236 info!(target: LOG_CONSENSUS, "Confirmed peers config {our_hash}");
237
238 return Ok(());
239 }
240 Err(e) => {
241 warn!(target: LOG_CONSENSUS, "Could not check consensus config hash: {}", OptStacktrace(e));
242 }
243 }
244
245 sleep(Duration::from_millis(100)).await;
246 }
247 }
248
249 pub async fn run_session(
250 &self,
251 connections: DynP2PConnections<Message>,
252 session_index: u64,
253 ) -> anyhow::Result<()> {
254 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
272 const BASE: f64 = 1.02;
273
274 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
275 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
276
277 let mut delay_config = aleph_bft::default_delay_config();
278
279 delay_config.unit_creation_delay = Arc::new(move |round_index| {
280 let delay = if round_index == 0 {
281 0.0
282 } else {
283 round_delay
284 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
285 * rand::thread_rng().gen_range(0.5..=1.5)
286 };
287
288 Duration::from_millis(delay.round() as u64)
289 });
290
291 let config = aleph_bft::create_config(
292 self.num_peers().total().into(),
293 self.identity().to_usize().into(),
294 session_index,
295 self.cfg
296 .consensus
297 .broadcast_rounds_per_session
298 .checked_add(EXP_SLOWDOWN_ROUNDS)
299 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
300 delay_config,
301 Duration::from_secs(10 * 365 * 24 * 60 * 60),
302 )
303 .expect("The exponential slowdown exceeds 10 years");
304
305 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
308 let (signature_sender, signature_receiver) = watch::channel(None);
309 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
310 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
311
312 let aleph_handle = spawn(
313 "aleph run session",
314 aleph_bft::run_session(
315 config,
316 aleph_bft::LocalIO::new(
317 DataProvider::new(
318 self.submission_receiver.clone(),
319 signature_receiver,
320 timestamp_sender,
321 ),
322 FinalizationHandler::new(unit_data_sender),
323 BackupWriter::new(self.db.clone()).await,
324 BackupReader::new(self.db.clone()),
325 ),
326 Network::new(connections),
327 Keychain::new(&self.cfg),
328 Spawner::new(self.task_group.make_subgroup()),
329 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
330 ),
331 );
332
333 let signed_session_outcome = self
334 .complete_signed_session_outcome(
335 session_index,
336 unit_data_receiver,
337 signature_sender,
338 timestamp_receiver,
339 )
340 .await?;
341
342 terminator_sender.send(()).ok();
345 aleph_handle.await.ok();
346
347 self.complete_session(session_index, signed_session_outcome)
351 .await;
352
353 self.checkpoint_database(session_index);
354
355 Ok(())
356 }
357
358 pub async fn complete_signed_session_outcome(
359 &self,
360 session_index: u64,
361 ordered_unit_receiver: Receiver<OrderedUnit>,
362 signature_sender: watch::Sender<Option<SchnorrSignature>>,
363 timestamp_receiver: Receiver<(Instant, u64)>,
364 ) -> anyhow::Result<SignedSessionOutcome> {
365 let mut item_index = 0;
368
369 let mut request_signed_session_outcome = Box::pin(async {
370 self.request_signed_session_outcome(&self.federation_api, session_index)
371 .await
372 });
373
374 loop {
378 tokio::select! {
379 ordered_unit = ordered_unit_receiver.recv() => {
380 let ordered_unit = ordered_unit?;
381
382 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
383 break;
384 }
385
386 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
387 if ordered_unit.creator == self.identity() {
388 loop {
389 match timestamp_receiver.try_recv() {
390 Ok((timestamp, chsum)) => {
391 if get_citem_bytes_chsum(&bytes) == chsum {
392 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
393 break;
394 }
395 warn!(target: LOG_CONSENSUS, "Not reporting ordering latency on possibly out of sync item");
396 }
397 Err(err) => {
398 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal on start");
399 break;
400 }
401 }
402 }
403 }
404
405 if let Ok(items) = Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()){
406 for item in items {
407 if self.process_consensus_item(
408 session_index,
409 item_index,
410 item.clone(),
411 ordered_unit.creator
412 ).await
413 .is_ok() {
414 item_index += 1;
415 }
416 }
417 }
418 }
419 },
420 signed_session_outcome = &mut request_signed_session_outcome => {
421 let pending_accepted_items = self.pending_accepted_items().await;
422
423 let (processed, unprocessed) = signed_session_outcome
425 .session_outcome
426 .items
427 .split_at(pending_accepted_items.len());
428
429 assert!(
430 processed.iter().eq(pending_accepted_items.iter()),
431 "Consensus Failure: pending accepted items disagree with federation consensus"
432 );
433
434 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
435 if let Err(err) = self.process_consensus_item(
436 session_index,
437 item_index as u64,
438 accepted_item.item.clone(),
439 accepted_item.peer
440 ).await {
441 panic!(
442 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
443 processed.len(),
444 unprocessed.len(),
445 );
446 }
447 }
448
449 return Ok(signed_session_outcome);
450 }
451 }
452 }
453
454 let items = self.pending_accepted_items().await;
455
456 assert_eq!(item_index, items.len() as u64);
457
458 let session_outcome = SessionOutcome { items };
459
460 let header = session_outcome.header(session_index);
461
462 let keychain = Keychain::new(&self.cfg);
463
464 signature_sender.send(Some(keychain.sign(&header)))?;
467
468 let mut signatures = BTreeMap::new();
469
470 let items_dump = tokio::sync::OnceCell::new();
471
472 while signatures.len() < self.num_peers().threshold() {
475 tokio::select! {
476 ordered_unit = ordered_unit_receiver.recv() => {
477 let ordered_unit = ordered_unit?;
478
479 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
480 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
481 signatures.insert(ordered_unit.creator, signature);
482 } else {
483 warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
484
485 items_dump.get_or_init(|| async {
486 for (idx, item) in session_outcome.items.iter().enumerate() {
487 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
488 }
489 }).await;
490 }
491 }
492 }
493 signed_session_outcome = &mut request_signed_session_outcome => {
494 assert_eq!(
495 header,
496 signed_session_outcome.session_outcome.header(session_index),
497 "Consensus Failure: header disagrees with federation consensus"
498 );
499
500 return Ok(signed_session_outcome);
501 }
502 }
503 }
504
505 Ok(SignedSessionOutcome {
506 session_outcome,
507 signatures,
508 })
509 }
510
511 fn decoders(&self) -> ModuleDecoderRegistry {
512 self.modules.decoder_registry()
513 }
514
515 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
516 self.db
517 .begin_transaction_nc()
518 .await
519 .find_by_prefix(&AcceptedItemPrefix)
520 .await
521 .map(|entry| entry.1)
522 .collect()
523 .await
524 }
525
526 pub async fn complete_session(
527 &self,
528 session_index: u64,
529 signed_session_outcome: SignedSessionOutcome,
530 ) {
531 let mut dbtx = self.db.begin_transaction().await;
532
533 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
534
535 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
536
537 if dbtx
538 .insert_entry(
539 &SignedSessionOutcomeKey(session_index),
540 &signed_session_outcome,
541 )
542 .await
543 .is_some()
544 {
545 panic!("We tried to overwrite a signed session outcome");
546 }
547
548 dbtx.commit_tx_result()
549 .await
550 .expect("This is the only place where we write to this key");
551 }
552
553 fn db_checkpoints_dir(&self) -> PathBuf {
555 self.data_dir.join(DB_CHECKPOINTS_DIR)
556 }
557
558 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
562 let checkpoint_dir = self.db_checkpoints_dir();
563
564 if checkpoint_dir.exists() {
565 debug!(
566 target: LOG_CONSENSUS,
567 ?current_session,
568 "Removing database checkpoints up to `current_session`"
569 );
570
571 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
572 if let Ok(file_name) = checkpoint.file_name().into_string() {
574 if let Ok(session) = file_name.parse::<u64>() {
575 if current_session >= self.checkpoint_retention
576 && session < current_session - self.checkpoint_retention
577 {
578 fs::remove_dir_all(checkpoint.path())?;
579 }
580 }
581 }
582 }
583 } else {
584 fs::create_dir_all(&checkpoint_dir)?;
585 }
586
587 Ok(())
588 }
589
590 fn checkpoint_database(&self, session_index: u64) {
594 if self.checkpoint_retention == 0 {
597 return;
598 }
599
600 let checkpoint_dir = self.db_checkpoints_dir();
601 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
602
603 {
604 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::DEBUG);
605 match self.db.checkpoint(&session_checkpoint_dir) {
606 Ok(()) => {
607 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
608 }
609 Err(e) => {
610 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
611 }
612 }
613 }
614
615 {
616 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::DEBUG);
618 if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
619 warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
620 }
621 }
622 }
623
624 fn delete_old_database_checkpoint(
627 &self,
628 session_index: u64,
629 checkpoint_dir: &Path,
630 ) -> anyhow::Result<()> {
631 if self.checkpoint_retention > session_index {
632 return Ok(());
633 }
634
635 let delete_session_index = session_index - self.checkpoint_retention;
636 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
637 if checkpoint_to_delete.exists() {
638 fs::remove_dir_all(checkpoint_to_delete)?;
639 }
640
641 Ok(())
642 }
643
644 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
645 pub async fn process_consensus_item(
646 &self,
647 session_index: u64,
648 item_index: u64,
649 item: ConsensusItem,
650 peer: PeerId,
651 ) -> anyhow::Result<()> {
652 let peer_id_str = &self.peer_id_str[peer.to_usize()];
653 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
654 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
655 .with_label_values(&[peer_id_str])
656 .start_timer();
657
658 trace!(
659 target: LOG_CONSENSUS,
660 %peer,
661 item = ?DebugConsensusItem(&item),
662 "Processing consensus item"
663 );
664
665 self.ci_status_senders
666 .get(&peer)
667 .expect("No ci status sender for peer {peer}")
668 .send(Some(session_index))
669 .inspect_err(|e| warn!(target: LOG_CONSENSUS, "Failed to update ci status {e}"))
670 .ok();
671
672 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
673 .with_label_values(&[&self.self_id_str, peer_id_str])
674 .set(session_index as i64);
675
676 let mut dbtx = self.db.begin_transaction().await;
677
678 dbtx.ignore_uncommitted();
679
680 if let Some(existing_item) = dbtx
684 .get_value(&AcceptedItemKey(item_index.to_owned()))
685 .await
686 {
687 if existing_item.item == item && existing_item.peer == peer {
688 return Ok(());
689 }
690
691 bail!("Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}", existing_item.peer);
692 }
693
694 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
695 .await?;
696
697 dbtx.warn_uncommitted();
700
701 dbtx.insert_entry(
702 &AcceptedItemKey(item_index),
703 &AcceptedItem {
704 item: item.clone(),
705 peer,
706 },
707 )
708 .await;
709
710 debug!(
711 target: LOG_CONSENSUS,
712 %peer,
713 item = ?DebugConsensusItem(&item),
714 "Processed consensus item"
715 );
716 let mut audit = Audit::default();
717
718 for (module_instance_id, kind, module) in self.modules.iter_modules() {
719 let _module_audit_timing =
720 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
721
722 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
723 .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
724 .start_timer();
725
726 module
727 .audit(
728 &mut dbtx
729 .to_ref_with_prefix_module_id(module_instance_id)
730 .0
731 .into_nc(),
732 &mut audit,
733 module_instance_id,
734 )
735 .await;
736 timing_prom.observe_duration();
737 }
738
739 assert!(
740 audit
741 .net_assets()
742 .expect("Overflow while checking balance sheet")
743 .milli_sat
744 >= 0,
745 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
746 );
747
748 dbtx.commit_tx_result()
749 .await
750 .expect("Committing consensus epoch failed");
751
752 CONSENSUS_ITEMS_PROCESSED_TOTAL
753 .with_label_values(&[&self.peer_id_str[peer.to_usize()]])
754 .inc();
755 timing_prom.observe_duration();
756
757 Ok(())
758 }
759
760 async fn process_consensus_item_with_db_transaction(
761 &self,
762 dbtx: &mut DatabaseTransaction<'_>,
763 consensus_item: ConsensusItem,
764 peer_id: PeerId,
765 ) -> anyhow::Result<()> {
766 self.decoders().assert_reject_mode();
769
770 match consensus_item {
771 ConsensusItem::Module(module_item) => {
772 let instance_id = module_item.module_instance_id();
773
774 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
775
776 self.modules
777 .get_expect(instance_id)
778 .process_consensus_item(module_dbtx, &module_item, peer_id)
779 .await
780 }
781 ConsensusItem::Transaction(transaction) => {
782 let txid = transaction.tx_hash();
783 if dbtx
784 .get_value(&AcceptedTransactionKey(txid))
785 .await
786 .is_some()
787 {
788 debug!(
789 target: LOG_CONSENSUS,
790 %txid,
791 "Transaction already accepted"
792 );
793 bail!("Transaction is already accepted");
794 }
795
796 let modules_ids = transaction
797 .outputs
798 .iter()
799 .map(DynOutput::module_instance_id)
800 .collect::<Vec<_>>();
801
802 process_transaction_with_dbtx(
803 self.modules.clone(),
804 dbtx,
805 &transaction,
806 self.cfg.consensus.version,
807 TxProcessingMode::Consensus,
808 )
809 .await
810 .map_err(|error| anyhow!(error.to_string()))?;
811
812 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
813 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
814 .await;
815
816 Ok(())
817 }
818 ConsensusItem::Default { variant, .. } => {
819 warn!(
820 target: LOG_CONSENSUS,
821 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
822 );
823
824 panic!("Unexpected consensus item type: {variant}")
825 }
826 }
827 }
828
829 async fn request_signed_session_outcome(
830 &self,
831 federation_api: &DynGlobalApi,
832 index: u64,
833 ) -> SignedSessionOutcome {
834 let decoders = self.decoders();
835 let keychain = Keychain::new(&self.cfg);
836 let threshold = self.num_peers().threshold();
837
838 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
839 let signed_session_outcome = response
840 .try_into_inner(&decoders)
841 .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
842 let header = signed_session_outcome.session_outcome.header(index);
843 if signed_session_outcome.signatures.len() == threshold
844 && signed_session_outcome
845 .signatures
846 .iter()
847 .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
848 {
849 Ok(signed_session_outcome)
850 } else {
851 Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
852 }
853 };
854
855 loop {
856 let result = federation_api
857 .request_with_strategy(
858 FilterMap::new(filter_map.clone()),
859 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
860 ApiRequestErased::new(index),
861 )
862 .await;
863
864 match result {
865 Ok(signed_session_outcome) => return signed_session_outcome,
866 Err(error) => {
867 error.report_if_unusual("Requesting Session Outcome");
868 }
869 }
870 }
871 }
872
873 async fn get_finished_session_count(&self) -> u64 {
876 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
877 }
878}
879
880pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
881 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
882 .await
883 .next()
884 .await
885 .map_or(0, |entry| (entry.0 .0) + 1)
886}