fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::net::SocketAddr;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use aleph_bft::Keychain as KeychainTrait;
9use anyhow::{anyhow, bail};
10use async_channel::Receiver;
11use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerConnectionStatus};
12use fedimint_api_client::query::FilterMap;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::fmt_utils::OptStacktrace;
19use fedimint_core::module::audit::Audit;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ServerModuleRegistry};
21use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24    AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{sleep, TaskGroup, TaskHandle};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::{timing, NumPeers, NumPeersExt, PeerId};
29use futures::StreamExt;
30use rand::Rng;
31use tokio::sync::{watch, RwLock};
32use tracing::{debug, info, instrument, warn, Level};
33
34use crate::config::ServerConfig;
35use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
36use crate::consensus::aleph_bft::data_provider::{get_citem_bytes_chsum, DataProvider, UnitData};
37use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
38use crate::consensus::aleph_bft::keychain::Keychain;
39use crate::consensus::aleph_bft::network::Network;
40use crate::consensus::aleph_bft::spawner::Spawner;
41use crate::consensus::aleph_bft::{to_node_index, Message};
42use crate::consensus::db::{
43    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
44    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
45};
46use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
47use crate::consensus::transaction::process_transaction_with_dbtx;
48use crate::fedimint_core::encoding::Encodable;
49use crate::metrics::{
50    CONSENSUS_ITEMS_PROCESSED_TOTAL, CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
51    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ORDERING_LATENCY_SECONDS,
52    CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX, CONSENSUS_SESSION_COUNT,
53};
54use crate::net::connect::{Connector, TlsTcpConnector};
55use crate::net::peers::{DelayCalculator, ReconnectPeerConnections};
56use crate::LOG_CONSENSUS;
57
58// The name of the directory where the database checkpoints are stored.
59const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61/// Runs the main server consensus loop
62pub struct ConsensusEngine {
63    pub modules: ServerModuleRegistry,
64    pub db: Database,
65    pub federation_api: DynGlobalApi,
66    pub cfg: ServerConfig,
67    pub submission_receiver: Receiver<ConsensusItem>,
68    pub shutdown_receiver: watch::Receiver<Option<u64>>,
69    pub last_ci_by_peer: Arc<RwLock<BTreeMap<PeerId, u64>>>,
70    /// Just a string version of `cfg.local.identity` for performance
71    pub self_id_str: String,
72    /// Just a string version of peer ids for performance
73    pub peer_id_str: Vec<String>,
74    pub connection_status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
75    pub task_group: TaskGroup,
76    pub data_dir: PathBuf,
77    pub checkpoint_retention: u64,
78    pub p2p_bind_addr: SocketAddr,
79}
80
81impl ConsensusEngine {
82    fn num_peers(&self) -> NumPeers {
83        self.cfg.consensus.broadcast_public_keys.to_num_peers()
84    }
85
86    fn identity(&self) -> PeerId {
87        self.cfg.local.identity
88    }
89
90    #[instrument(name = "run", skip_all, fields(id=%self.cfg.local.identity))]
91    pub async fn run(self) -> anyhow::Result<()> {
92        if self.num_peers().total() == 1 {
93            self.run_single_guardian(self.task_group.make_handle())
94                .await
95        } else {
96            self.run_consensus(self.p2p_bind_addr, self.task_group.make_handle())
97                .await
98        }
99    }
100
101    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
102        assert_eq!(self.num_peers(), NumPeers::from(1));
103
104        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
105
106        while !task_handle.is_shutting_down() {
107            let session_index = self.get_finished_session_count().await;
108
109            CONSENSUS_SESSION_COUNT.set(session_index as i64);
110
111            let mut item_index = self.pending_accepted_items().await.len() as u64;
112
113            let session_start_time = std::time::Instant::now();
114
115            while let Ok(item) = self.submission_receiver.recv().await {
116                if self
117                    .process_consensus_item(session_index, item_index, item, self.identity())
118                    .await
119                    .is_ok()
120                {
121                    item_index += 1;
122                }
123
124                // we rely on the module consensus items to notice the timeout
125                if session_start_time.elapsed() > Duration::from_secs(60) {
126                    break;
127                }
128            }
129
130            let session_outcome = SessionOutcome {
131                items: self.pending_accepted_items().await,
132            };
133
134            let header = session_outcome.header(session_index);
135            let signature = Keychain::new(&self.cfg).sign(&header);
136            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
137
138            self.complete_session(
139                session_index,
140                SignedSessionOutcome {
141                    session_outcome,
142                    signatures,
143                },
144            )
145            .await;
146
147            self.checkpoint_database(session_index);
148
149            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
150
151            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
152                break;
153            }
154        }
155
156        info!(target: LOG_CONSENSUS, "Consensus task shut down");
157
158        Ok(())
159    }
160
161    pub async fn run_consensus(
162        &self,
163        p2p_bind_addr: SocketAddr,
164        task_handle: TaskHandle,
165    ) -> anyhow::Result<()> {
166        // We need four peers to run the atomic broadcast
167        assert!(self.num_peers().total() >= 4);
168
169        self.confirm_server_config_consensus_hash().await?;
170
171        // Build P2P connections for the atomic broadcast
172        let connections = ReconnectPeerConnections::new(
173            self.cfg.network_config(p2p_bind_addr),
174            DelayCalculator::PROD_DEFAULT,
175            TlsTcpConnector::new(self.cfg.tls_config(), self.identity()).into_dyn(),
176            &self.task_group,
177            Arc::clone(&self.connection_status_channels),
178        )
179        .await;
180
181        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
182
183        while !task_handle.is_shutting_down() {
184            let session_index = self.get_finished_session_count().await;
185
186            CONSENSUS_SESSION_COUNT.set(session_index as i64);
187
188            self.run_session(connections.clone(), session_index).await?;
189
190            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
191
192            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
193                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
194
195                sleep(Duration::from_secs(60)).await;
196
197                break;
198            }
199        }
200
201        info!(target: LOG_CONSENSUS, "Consensus task shut down");
202
203        Ok(())
204    }
205
206    async fn confirm_server_config_consensus_hash(&self) -> anyhow::Result<()> {
207        let our_hash = self.cfg.consensus.consensus_hash();
208
209        info!(target: LOG_CONSENSUS, "Waiting for peers config {our_hash}");
210
211        loop {
212            match self.federation_api.server_config_consensus_hash().await {
213                Ok(consensus_hash) => {
214                    if consensus_hash != our_hash {
215                        bail!("Our consensus config doesn't match peers!")
216                    }
217
218                    info!(target: LOG_CONSENSUS, "Confirmed peers config {our_hash}");
219
220                    return Ok(());
221                }
222                Err(e) => {
223                    warn!(target: LOG_CONSENSUS, "Could not check consensus config hash: {}", OptStacktrace(e));
224                }
225            }
226
227            sleep(Duration::from_millis(100)).await;
228        }
229    }
230
231    pub async fn run_session(
232        &self,
233        connections: ReconnectPeerConnections<Message>,
234        session_index: u64,
235    ) -> anyhow::Result<()> {
236        // In order to bound a sessions RAM consumption we need to bound its number of
237        // units and therefore its number of rounds. Since we use a session to
238        // create a naive secp256k1 threshold signature for the header of session
239        // outcome we have to guarantee that an attacker cannot exhaust our
240        // memory by preventing the creation of a threshold signature, thereby
241        // keeping the session open indefinitely. Hence, after a certain round
242        // index, we increase the delay between rounds exponentially such that
243        // the end of the aleph bft session would only be reached after a minimum
244        // of 10 years. In case of such an attack the broadcast stops ordering any
245        // items until the attack subsides as no items are ordered while the
246        // signatures are collected. The maximum RAM consumption of the aleph bft
247        // broadcast instance is therefore bound by:
248        //
249        // self.keychain.peer_count()
250        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
251        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
252
253        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
254        const BASE: f64 = 1.02;
255
256        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
257        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
258
259        let mut delay_config = aleph_bft::default_delay_config();
260
261        delay_config.unit_creation_delay = Arc::new(move |round_index| {
262            let delay = if round_index == 0 {
263                0.0
264            } else {
265                round_delay
266                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
267                    * rand::thread_rng().gen_range(0.5..=1.5)
268            };
269
270            Duration::from_millis(delay.round() as u64)
271        });
272
273        let config = aleph_bft::create_config(
274            self.num_peers().total().into(),
275            self.identity().to_usize().into(),
276            session_index,
277            self.cfg
278                .consensus
279                .broadcast_rounds_per_session
280                .checked_add(EXP_SLOWDOWN_ROUNDS)
281                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
282            delay_config,
283            Duration::from_secs(10 * 365 * 24 * 60 * 60),
284        )
285        .expect("The exponential slowdown exceeds 10 years");
286
287        // we can use an unbounded channel here since the number and size of units
288        // ordered in a single aleph session is bounded as described above
289        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
290        let (signature_sender, signature_receiver) = watch::channel(None);
291        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
292        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
293
294        let aleph_handle = spawn(
295            "aleph run session",
296            aleph_bft::run_session(
297                config,
298                aleph_bft::LocalIO::new(
299                    DataProvider::new(
300                        self.submission_receiver.clone(),
301                        signature_receiver,
302                        timestamp_sender,
303                    ),
304                    FinalizationHandler::new(unit_data_sender),
305                    BackupWriter::new(self.db.clone()).await,
306                    BackupReader::new(self.db.clone()),
307                ),
308                Network::new(connections),
309                Keychain::new(&self.cfg),
310                Spawner::new(self.task_group.make_subgroup()),
311                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
312            ),
313        );
314
315        let signed_session_outcome = self
316            .complete_signed_session_outcome(
317                session_index,
318                unit_data_receiver,
319                signature_sender,
320                timestamp_receiver,
321            )
322            .await?;
323
324        // We can terminate the session instead of waiting for other peers to complete
325        // it since they can always download the signed session outcome from us
326        terminator_sender.send(()).ok();
327        aleph_handle.await.ok();
328
329        // This method removes the backup of the current session from the database
330        // and therefore has to be called after we have waited for the session to
331        // shut down, or we risk write-write conflicts with the UnitSaver
332        self.complete_session(session_index, signed_session_outcome)
333            .await;
334
335        self.checkpoint_database(session_index);
336
337        Ok(())
338    }
339
340    pub async fn complete_signed_session_outcome(
341        &self,
342        session_index: u64,
343        ordered_unit_receiver: Receiver<OrderedUnit>,
344        signature_sender: watch::Sender<Option<SchnorrSignature>>,
345        timestamp_receiver: Receiver<(Instant, u64)>,
346    ) -> anyhow::Result<SignedSessionOutcome> {
347        // It is guaranteed that aleph bft will always replay all previously processed
348        // items from the current session from index zero
349        let mut item_index = 0;
350
351        let mut request_signed_session_outcome = Box::pin(async {
352            self.request_signed_session_outcome(&self.federation_api, session_index)
353                .await
354        });
355
356        // We build a session outcome out of the ordered batches until either we have
357        // processed broadcast_rounds_per_session rounds or a threshold signed
358        // session outcome is obtained from our peers
359        loop {
360            tokio::select! {
361                ordered_unit = ordered_unit_receiver.recv() => {
362                    let ordered_unit = ordered_unit?;
363
364                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
365                        break;
366                    }
367
368                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
369                        if ordered_unit.creator == self.identity() {
370                            loop {
371                                 match timestamp_receiver.try_recv() {
372                                    Ok((timestamp, chsum)) => {
373                                        if get_citem_bytes_chsum(&bytes) == chsum {
374                                            CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
375                                            break;
376                                        }
377                                        warn!(target: LOG_CONSENSUS, "Not reporting ordering latency on possibly out of sync item");
378                                    }
379                                    Err(err) => {
380                                        debug!(target: LOG_CONSENSUS, %err, "Missing submission timestamp. This is normal on start");
381                                        break;
382                                    }
383                                }
384                            }
385                        }
386
387                        if let Ok(items) = Vec::<ConsensusItem>::consensus_decode(&mut bytes.as_slice(), &self.decoders()){
388                            for item in items {
389                                if self.process_consensus_item(
390                                    session_index,
391                                    item_index,
392                                    item.clone(),
393                                    ordered_unit.creator
394                                ).await
395                                .is_ok() {
396                                    item_index += 1;
397                                }
398                            }
399                        }
400                    }
401                },
402                signed_session_outcome = &mut request_signed_session_outcome => {
403                    let pending_accepted_items = self.pending_accepted_items().await;
404
405                    // this panics if we have more accepted items than the signed session outcome
406                    let (processed, unprocessed) = signed_session_outcome
407                        .session_outcome
408                        .items
409                        .split_at(pending_accepted_items.len());
410
411                    assert!(
412                        processed.iter().eq(pending_accepted_items.iter()),
413                        "Consensus Failure: pending accepted items disagree with federation consensus"
414                    );
415
416                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
417                        if let Err(err) = self.process_consensus_item(
418                            session_index,
419                            item_index as u64,
420                            accepted_item.item.clone(),
421                            accepted_item.peer
422                        ).await {
423                            panic!(
424                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
425                                processed.len(),
426                                unprocessed.len(),
427                            );
428                        }
429                    }
430
431                    return Ok(signed_session_outcome);
432                }
433            }
434        }
435
436        let items = self.pending_accepted_items().await;
437
438        assert_eq!(item_index, items.len() as u64);
439
440        let session_outcome = SessionOutcome { items };
441
442        let header = session_outcome.header(session_index);
443
444        let keychain = Keychain::new(&self.cfg);
445
446        // We send our own signature to the data provider to be submitted to the atomic
447        // broadcast and collected by our peers
448        signature_sender.send(Some(keychain.sign(&header)))?;
449
450        let mut signatures = BTreeMap::new();
451
452        let items_dump = tokio::sync::OnceCell::new();
453
454        // We collect the ordered signatures until we either obtain a threshold
455        // signature or a signed session outcome arrives from our peers
456        while signatures.len() < self.num_peers().threshold() {
457            tokio::select! {
458                ordered_unit = ordered_unit_receiver.recv() => {
459                    let ordered_unit = ordered_unit?;
460
461                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
462                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
463                            signatures.insert(ordered_unit.creator, signature);
464                        } else {
465                            warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
466
467                            items_dump.get_or_init(|| async {
468                                for (idx, item) in session_outcome.items.iter().enumerate() {
469                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
470                                }
471                            }).await;
472                        }
473                    }
474                }
475                signed_session_outcome = &mut request_signed_session_outcome => {
476                    assert_eq!(
477                        header,
478                        signed_session_outcome.session_outcome.header(session_index),
479                        "Consensus Failure: header disagrees with federation consensus"
480                    );
481
482                    return Ok(signed_session_outcome);
483                }
484            }
485        }
486
487        Ok(SignedSessionOutcome {
488            session_outcome,
489            signatures,
490        })
491    }
492
493    fn decoders(&self) -> ModuleDecoderRegistry {
494        self.modules.decoder_registry()
495    }
496
497    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
498        self.db
499            .begin_transaction_nc()
500            .await
501            .find_by_prefix(&AcceptedItemPrefix)
502            .await
503            .map(|entry| entry.1)
504            .collect()
505            .await
506    }
507
508    pub async fn complete_session(
509        &self,
510        session_index: u64,
511        signed_session_outcome: SignedSessionOutcome,
512    ) {
513        let mut dbtx = self.db.begin_transaction().await;
514
515        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
516
517        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
518
519        if dbtx
520            .insert_entry(
521                &SignedSessionOutcomeKey(session_index),
522                &signed_session_outcome,
523            )
524            .await
525            .is_some()
526        {
527            panic!("We tried to overwrite a signed session outcome");
528        }
529
530        dbtx.commit_tx_result()
531            .await
532            .expect("This is the only place where we write to this key");
533    }
534
535    /// Returns the full path where the database checkpoints are stored.
536    fn db_checkpoints_dir(&self) -> PathBuf {
537        self.data_dir.join(DB_CHECKPOINTS_DIR)
538    }
539
540    /// Creates the directory within the data directory for storing the database
541    /// checkpoints or deletes checkpoints before `current_session` -
542    /// `checkpoint_retention`.
543    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
544        let checkpoint_dir = self.db_checkpoints_dir();
545
546        if checkpoint_dir.exists() {
547            debug!(
548                ?current_session,
549                "Removing database checkpoints up to `current_session`"
550            );
551
552            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
553                // Validate that the directory is a session index
554                if let Ok(file_name) = checkpoint.file_name().into_string() {
555                    if let Ok(session) = file_name.parse::<u64>() {
556                        if current_session >= self.checkpoint_retention
557                            && session < current_session - self.checkpoint_retention
558                        {
559                            fs::remove_dir_all(checkpoint.path())?;
560                        }
561                    }
562                }
563            }
564        } else {
565            fs::create_dir_all(&checkpoint_dir)?;
566        }
567
568        Ok(())
569    }
570
571    /// Creates a backup of the database in the checkpoint directory. These
572    /// checkpoints can be used to restore the database in case the
573    /// federation falls out of consensus (recommended for experts only).
574    fn checkpoint_database(&self, session_index: u64) {
575        // If `checkpoint_retention` has been turned off, don't checkpoint the database
576        // at all.
577        if self.checkpoint_retention == 0 {
578            return;
579        }
580
581        let checkpoint_dir = self.db_checkpoints_dir();
582        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
583
584        {
585            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::DEBUG);
586            match self.db.checkpoint(&session_checkpoint_dir) {
587                Ok(()) => {
588                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
589                }
590                Err(e) => {
591                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
592                }
593            }
594        }
595
596        {
597            // Check if any old checkpoint need to be cleaned up
598            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::DEBUG);
599            if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
600                warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
601            }
602        }
603    }
604
605    /// Deletes the database checkpoint directory equal to `session_index` -
606    /// `checkpoint_retention`
607    fn delete_old_database_checkpoint(
608        &self,
609        session_index: u64,
610        checkpoint_dir: &Path,
611    ) -> anyhow::Result<()> {
612        if self.checkpoint_retention > session_index {
613            return Ok(());
614        }
615
616        let delete_session_index = session_index - self.checkpoint_retention;
617        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
618        if checkpoint_to_delete.exists() {
619            fs::remove_dir_all(checkpoint_to_delete)?;
620        }
621
622        Ok(())
623    }
624
625    #[instrument(target = "fm::consensus", skip(self, item), level = "info")]
626    pub async fn process_consensus_item(
627        &self,
628        session_index: u64,
629        item_index: u64,
630        item: ConsensusItem,
631        peer: PeerId,
632    ) -> anyhow::Result<()> {
633        let peer_id_str = &self.peer_id_str[peer.to_usize()];
634        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
635        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
636            .with_label_values(&[peer_id_str])
637            .start_timer();
638
639        debug!(%peer, item = ?DebugConsensusItem(&item), "Processing consensus item");
640
641        self.last_ci_by_peer
642            .write()
643            .await
644            .insert(peer, session_index);
645
646        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
647            .with_label_values(&[&self.self_id_str, peer_id_str])
648            .set(session_index as i64);
649
650        let mut dbtx = self.db.begin_transaction().await;
651
652        dbtx.ignore_uncommitted();
653
654        // When we recover from a mid-session crash aleph bft will replay the units that
655        // were already processed before the crash. We therefore skip all consensus
656        // items until we have seen every previously accepted items again.
657        if let Some(existing_item) = dbtx
658            .get_value(&AcceptedItemKey(item_index.to_owned()))
659            .await
660        {
661            if existing_item.item == item && existing_item.peer == peer {
662                return Ok(());
663            }
664
665            bail!("Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}", existing_item.peer);
666        }
667
668        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
669            .await?;
670
671        // After this point we have to commit the database transaction since the
672        // item has been fully processed without errors
673        dbtx.warn_uncommitted();
674
675        dbtx.insert_entry(&AcceptedItemKey(item_index), &AcceptedItem { item, peer })
676            .await;
677
678        let mut audit = Audit::default();
679
680        for (module_instance_id, kind, module) in self.modules.iter_modules() {
681            let _module_audit_timing =
682                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
683
684            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
685                .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
686                .start_timer();
687
688            module
689                .audit(
690                    &mut dbtx
691                        .to_ref_with_prefix_module_id(module_instance_id)
692                        .0
693                        .into_nc(),
694                    &mut audit,
695                    module_instance_id,
696                )
697                .await;
698            timing_prom.observe_duration();
699        }
700
701        assert!(
702            audit
703                .net_assets()
704                .expect("Overflow while checking balance sheet")
705                .milli_sat
706                >= 0,
707            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
708        );
709
710        dbtx.commit_tx_result()
711            .await
712            .expect("Committing consensus epoch failed");
713
714        CONSENSUS_ITEMS_PROCESSED_TOTAL
715            .with_label_values(&[&self.peer_id_str[peer.to_usize()]])
716            .inc();
717        timing_prom.observe_duration();
718
719        Ok(())
720    }
721
722    async fn process_consensus_item_with_db_transaction(
723        &self,
724        dbtx: &mut DatabaseTransaction<'_>,
725        consensus_item: ConsensusItem,
726        peer_id: PeerId,
727    ) -> anyhow::Result<()> {
728        // We rely on decoding rejecting any unknown module instance ids to avoid
729        // peer-triggered panic here
730        self.decoders().assert_reject_mode();
731
732        match consensus_item {
733            ConsensusItem::Module(module_item) => {
734                let instance_id = module_item.module_instance_id();
735
736                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
737
738                self.modules
739                    .get_expect(instance_id)
740                    .process_consensus_item(module_dbtx, &module_item, peer_id)
741                    .await
742            }
743            ConsensusItem::Transaction(transaction) => {
744                let txid = transaction.tx_hash();
745                if dbtx
746                    .get_value(&AcceptedTransactionKey(txid))
747                    .await
748                    .is_some()
749                {
750                    debug!(target: LOG_CONSENSUS, %txid, "Transaction already accepted");
751                    bail!("Transaction is already accepted");
752                }
753
754                let modules_ids = transaction
755                    .outputs
756                    .iter()
757                    .map(DynOutput::module_instance_id)
758                    .collect::<Vec<_>>();
759
760                process_transaction_with_dbtx(self.modules.clone(), dbtx, &transaction)
761                    .await
762                    .map_err(|error| anyhow!(error.to_string()))?;
763
764                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
765                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
766                    .await;
767
768                Ok(())
769            }
770            ConsensusItem::Default { variant, .. } => {
771                warn!(
772                    target: LOG_CONSENSUS,
773                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
774                );
775
776                panic!("Unexpected consensus item type: {variant}")
777            }
778        }
779    }
780
781    async fn request_signed_session_outcome(
782        &self,
783        federation_api: &DynGlobalApi,
784        index: u64,
785    ) -> SignedSessionOutcome {
786        let decoders = self.decoders();
787        let keychain = Keychain::new(&self.cfg);
788        let threshold = self.num_peers().threshold();
789
790        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| match response
791            .try_into_inner(&decoders)
792        {
793            Ok(signed_session_outcome) => {
794                if signed_session_outcome.signatures.len() == threshold
795                    && signed_session_outcome
796                        .signatures
797                        .iter()
798                        .all(|(peer_id, sig)| {
799                            keychain.verify(
800                                &signed_session_outcome.session_outcome.header(index),
801                                sig,
802                                to_node_index(*peer_id),
803                            )
804                        })
805                {
806                    Ok(signed_session_outcome)
807                } else {
808                    Err(anyhow!("Invalid signatures"))
809                }
810            }
811            Err(error) => Err(anyhow!(error.to_string())),
812        };
813
814        loop {
815            let result = federation_api
816                .request_with_strategy(
817                    FilterMap::new(filter_map.clone(), self.num_peers()),
818                    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
819                    ApiRequestErased::new(index),
820                )
821                .await;
822
823            match result {
824                Ok(signed_session_outcome) => return signed_session_outcome,
825                Err(err) => {
826                    // Workaround: timeout messages here are annoying
827                    if !err.to_string().contains("Rpc error: Request timeout") {
828                        warn!(target: LOG_CONSENSUS, %err, "Error while requesting signed session outcome");
829                    }
830                }
831            }
832        }
833    }
834
835    /// Returns the number of sessions already saved in the database. This count
836    /// **does not** include the currently running session.
837    async fn get_finished_session_count(&self) -> u64 {
838        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
839    }
840}
841
842pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
843    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
844        .await
845        .next()
846        .await
847        .map_or(0, |entry| (entry.0 .0) + 1)
848}