fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::net::SocketAddr;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use aleph_bft::Keychain as KeychainTrait;
9use anyhow::{anyhow, bail};
10use async_channel::Receiver;
11use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, P2PConnectionStatus, PeerError};
12use fedimint_api_client::query::FilterMap;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::fmt_utils::OptStacktrace;
19use fedimint_core::module::audit::Audit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
22use fedimint_core::net::peers::{DynP2PConnections, IP2PConnections};
23use fedimint_core::runtime::spawn;
24use fedimint_core::session_outcome::{
25    AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
26};
27use fedimint_core::task::{sleep, TaskGroup, TaskHandle};
28use fedimint_core::timing::TimeReporter;
29use fedimint_core::util::FmtCompact as _;
30use fedimint_core::{timing, NumPeers, NumPeersExt, PeerId};
31use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
32use futures::StreamExt;
33use rand::Rng;
34use tokio::sync::watch;
35use tracing::{debug, info, instrument, trace, warn, Level};
36
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{get_citem_bytes_chsum, DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::{to_node_index, Message};
45use crate::consensus::db::{
46    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{process_transaction_with_dbtx, TxProcessingMode};
51use crate::fedimint_core::encoding::Encodable;
52use crate::metrics::{
53    CONSENSUS_ITEMS_PROCESSED_TOTAL, CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
54    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ORDERING_LATENCY_SECONDS,
55    CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX, CONSENSUS_SESSION_COUNT,
56};
57use crate::net::p2p::ReconnectP2PConnections;
58use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
59use crate::LOG_CONSENSUS;
60
61// The name of the directory where the database checkpoints are stored.
62const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
63
64/// Runs the main server consensus loop
65pub struct ConsensusEngine {
66    pub modules: ServerModuleRegistry,
67    pub db: Database,
68    pub federation_api: DynGlobalApi,
69    pub cfg: ServerConfig,
70    pub submission_receiver: Receiver<ConsensusItem>,
71    pub shutdown_receiver: watch::Receiver<Option<u64>>,
72    pub p2p_status_senders: BTreeMap<PeerId, watch::Sender<P2PConnectionStatus>>,
73    pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
74    /// Just a string version of `cfg.local.identity` for performance
75    pub self_id_str: String,
76    /// Just a string version of peer ids for performance
77    pub peer_id_str: Vec<String>,
78    pub task_group: TaskGroup,
79    pub data_dir: PathBuf,
80    pub checkpoint_retention: u64,
81    pub p2p_bind_addr: SocketAddr,
82}
83
84impl ConsensusEngine {
85    fn num_peers(&self) -> NumPeers {
86        self.cfg.consensus.broadcast_public_keys.to_num_peers()
87    }
88
89    fn identity(&self) -> PeerId {
90        self.cfg.local.identity
91    }
92
93    #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
94    pub async fn run(self) -> anyhow::Result<()> {
95        if self.num_peers().total() == 1 {
96            self.run_single_guardian(self.task_group.make_handle())
97                .await
98        } else {
99            self.run_consensus(self.p2p_bind_addr, self.task_group.make_handle())
100                .await
101        }
102    }
103
104    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
105        assert_eq!(self.num_peers(), NumPeers::from(1));
106
107        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
108
109        while !task_handle.is_shutting_down() {
110            let session_index = self.get_finished_session_count().await;
111
112            CONSENSUS_SESSION_COUNT.set(session_index as i64);
113
114            let mut item_index = self.pending_accepted_items().await.len() as u64;
115
116            let session_start_time = std::time::Instant::now();
117
118            while let Ok(item) = self.submission_receiver.recv().await {
119                if self
120                    .process_consensus_item(session_index, item_index, item, self.identity())
121                    .await
122                    .is_ok()
123                {
124                    item_index += 1;
125                }
126
127                // we rely on the module consensus items to notice the timeout
128                if session_start_time.elapsed() > Duration::from_secs(60) {
129                    break;
130                }
131            }
132
133            let session_outcome = SessionOutcome {
134                items: self.pending_accepted_items().await,
135            };
136
137            let header = session_outcome.header(session_index);
138            let signature = Keychain::new(&self.cfg).sign(&header);
139            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
140
141            self.complete_session(
142                session_index,
143                SignedSessionOutcome {
144                    session_outcome,
145                    signatures,
146                },
147            )
148            .await;
149
150            self.checkpoint_database(session_index);
151
152            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
153
154            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
155                break;
156            }
157        }
158
159        info!(target: LOG_CONSENSUS, "Consensus task shut down");
160
161        Ok(())
162    }
163
164    pub async fn run_consensus(
165        &self,
166        p2p_bind_addr: SocketAddr,
167        task_handle: TaskHandle,
168    ) -> anyhow::Result<()> {
169        // We need four peers to run the atomic broadcast
170        assert!(self.num_peers().total() >= 4);
171
172        self.confirm_server_config_consensus_hash().await?;
173
174        // Build P2P connections for the atomic broadcast
175        let connector = TlsTcpConnector::new(
176            self.cfg.tls_config(),
177            p2p_bind_addr,
178            self.cfg
179                .local
180                .p2p_endpoints
181                .iter()
182                .map(|(&id, endpoint)| (id, endpoint.url.clone()))
183                .collect(),
184            self.identity(),
185        )
186        .into_dyn();
187
188        let connections = ReconnectP2PConnections::new(
189            self.cfg.local.identity,
190            connector,
191            &self.task_group,
192            Some(self.p2p_status_senders.clone()),
193        )
194        .await
195        .into_dyn();
196
197        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
198
199        while !task_handle.is_shutting_down() {
200            let session_index = self.get_finished_session_count().await;
201
202            CONSENSUS_SESSION_COUNT.set(session_index as i64);
203
204            info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
205
206            self.run_session(connections.clone(), session_index).await?;
207
208            info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
209
210            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
211                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
212
213                sleep(Duration::from_secs(60)).await;
214
215                break;
216            }
217        }
218
219        info!(target: LOG_CONSENSUS, "Consensus task shut down");
220
221        Ok(())
222    }
223
224    async fn confirm_server_config_consensus_hash(&self) -> anyhow::Result<()> {
225        let our_hash = self.cfg.consensus.consensus_hash();
226
227        info!(target: LOG_CONSENSUS, "Waiting for peers config {our_hash}");
228
229        loop {
230            match self.federation_api.server_config_consensus_hash().await {
231                Ok(consensus_hash) => {
232                    if consensus_hash != our_hash {
233                        bail!("Our consensus config doesn't match peers!")
234                    }
235
236                    info!(target: LOG_CONSENSUS, "Confirmed peers config {our_hash}");
237
238                    return Ok(());
239                }
240                Err(e) => {
241                    warn!(target: LOG_CONSENSUS, "Could not check consensus config hash: {}", OptStacktrace(e));
242                }
243            }
244
245            sleep(Duration::from_millis(100)).await;
246        }
247    }
248
249    pub async fn run_session(
250        &self,
251        connections: DynP2PConnections<Message>,
252        session_index: u64,
253    ) -> anyhow::Result<()> {
254        // In order to bound a sessions RAM consumption we need to bound its number of
255        // units and therefore its number of rounds. Since we use a session to
256        // create a naive secp256k1 threshold signature for the header of session
257        // outcome we have to guarantee that an attacker cannot exhaust our
258        // memory by preventing the creation of a threshold signature, thereby
259        // keeping the session open indefinitely. Hence, after a certain round
260        // index, we increase the delay between rounds exponentially such that
261        // the end of the aleph bft session would only be reached after a minimum
262        // of 10 years. In case of such an attack the broadcast stops ordering any
263        // items until the attack subsides as no items are ordered while the
264        // signatures are collected. The maximum RAM consumption of the aleph bft
265        // broadcast instance is therefore bound by:
266        //
267        // self.keychain.peer_count()
268        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
269        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
270
271        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
272        const BASE: f64 = 1.02;
273
274        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
275        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
276
277        let mut delay_config = aleph_bft::default_delay_config();
278
279        delay_config.unit_creation_delay = Arc::new(move |round_index| {
280            let delay = if round_index == 0 {
281                0.0
282            } else {
283                round_delay
284                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
285                    * rand::thread_rng().gen_range(0.5..=1.5)
286            };
287
288            Duration::from_millis(delay.round() as u64)
289        });
290
291        let config = aleph_bft::create_config(
292            self.num_peers().total().into(),
293            self.identity().to_usize().into(),
294            session_index,
295            self.cfg
296                .consensus
297                .broadcast_rounds_per_session
298                .checked_add(EXP_SLOWDOWN_ROUNDS)
299                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
300            delay_config,
301            Duration::from_secs(10 * 365 * 24 * 60 * 60),
302        )
303        .expect("The exponential slowdown exceeds 10 years");
304
305        // we can use an unbounded channel here since the number and size of units
306        // ordered in a single aleph session is bounded as described above
307        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
308        let (signature_sender, signature_receiver) = watch::channel(None);
309        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
310        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
311
312        let aleph_handle = spawn(
313            "aleph run session",
314            aleph_bft::run_session(
315                config,
316                aleph_bft::LocalIO::new(
317                    DataProvider::new(
318                        self.submission_receiver.clone(),
319                        signature_receiver,
320                        timestamp_sender,
321                    ),
322                    FinalizationHandler::new(unit_data_sender),
323                    BackupWriter::new(self.db.clone()).await,
324                    BackupReader::new(self.db.clone()),
325                ),
326                Network::new(connections),
327                Keychain::new(&self.cfg),
328                Spawner::new(self.task_group.make_subgroup()),
329                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
330            ),
331        );
332
333        let signed_session_outcome = self
334            .complete_signed_session_outcome(
335                session_index,
336                unit_data_receiver,
337                signature_sender,
338                timestamp_receiver,
339            )
340            .await?;
341
342        // We can terminate the session instead of waiting for other peers to complete
343        // it since they can always download the signed session outcome from us
344        terminator_sender.send(()).ok();
345        aleph_handle.await.ok();
346
347        // This method removes the backup of the current session from the database
348        // and therefore has to be called after we have waited for the session to
349        // shut down, or we risk write-write conflicts with the UnitSaver
350        self.complete_session(session_index, signed_session_outcome)
351            .await;
352
353        self.checkpoint_database(session_index);
354
355        Ok(())
356    }
357
358    pub async fn complete_signed_session_outcome(
359        &self,
360        session_index: u64,
361        ordered_unit_receiver: Receiver<OrderedUnit>,
362        signature_sender: watch::Sender<Option<SchnorrSignature>>,
363        timestamp_receiver: Receiver<(Instant, u64)>,
364    ) -> anyhow::Result<SignedSessionOutcome> {
365        // It is guaranteed that aleph bft will always replay all previously processed
366        // items from the current session from index zero
367        let mut item_index = 0;
368
369        let mut request_signed_session_outcome = Box::pin(async {
370            self.request_signed_session_outcome(&self.federation_api, session_index)
371                .await
372        });
373
374        // We build a session outcome out of the ordered batches until either we have
375        // processed broadcast_rounds_per_session rounds or a threshold signed
376        // session outcome is obtained from our peers
377        loop {
378            tokio::select! {
379                ordered_unit = ordered_unit_receiver.recv() => {
380                    let ordered_unit = ordered_unit?;
381
382                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
383                        break;
384                    }
385
386                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
387                        if ordered_unit.creator == self.identity() {
388                            loop {
389                                 match timestamp_receiver.try_recv() {
390                                    Ok((timestamp, chsum)) => {
391                                        if get_citem_bytes_chsum(&bytes) == chsum {
392                                            CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
393                                            break;
394                                        }
395                                        warn!(target: LOG_CONSENSUS, "Not reporting ordering latency on possibly out of sync item");
396                                    }
397                                    Err(err) => {
398                                        debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal on start");
399                                        break;
400                                    }
401                                }
402                            }
403                        }
404
405                        if let Ok(items) = Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()){
406                            for item in items {
407                                if self.process_consensus_item(
408                                    session_index,
409                                    item_index,
410                                    item.clone(),
411                                    ordered_unit.creator
412                                ).await
413                                .is_ok() {
414                                    item_index += 1;
415                                }
416                            }
417                        }
418                    }
419                },
420                signed_session_outcome = &mut request_signed_session_outcome => {
421                    let pending_accepted_items = self.pending_accepted_items().await;
422
423                    // this panics if we have more accepted items than the signed session outcome
424                    let (processed, unprocessed) = signed_session_outcome
425                        .session_outcome
426                        .items
427                        .split_at(pending_accepted_items.len());
428
429                    assert!(
430                        processed.iter().eq(pending_accepted_items.iter()),
431                        "Consensus Failure: pending accepted items disagree with federation consensus"
432                    );
433
434                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
435                        if let Err(err) = self.process_consensus_item(
436                            session_index,
437                            item_index as u64,
438                            accepted_item.item.clone(),
439                            accepted_item.peer
440                        ).await {
441                            panic!(
442                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
443                                processed.len(),
444                                unprocessed.len(),
445                            );
446                        }
447                    }
448
449                    return Ok(signed_session_outcome);
450                }
451            }
452        }
453
454        let items = self.pending_accepted_items().await;
455
456        assert_eq!(item_index, items.len() as u64);
457
458        let session_outcome = SessionOutcome { items };
459
460        let header = session_outcome.header(session_index);
461
462        let keychain = Keychain::new(&self.cfg);
463
464        // We send our own signature to the data provider to be submitted to the atomic
465        // broadcast and collected by our peers
466        signature_sender.send(Some(keychain.sign(&header)))?;
467
468        let mut signatures = BTreeMap::new();
469
470        let items_dump = tokio::sync::OnceCell::new();
471
472        // We collect the ordered signatures until we either obtain a threshold
473        // signature or a signed session outcome arrives from our peers
474        while signatures.len() < self.num_peers().threshold() {
475            tokio::select! {
476                ordered_unit = ordered_unit_receiver.recv() => {
477                    let ordered_unit = ordered_unit?;
478
479                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
480                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
481                            signatures.insert(ordered_unit.creator, signature);
482                        } else {
483                            warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
484
485                            items_dump.get_or_init(|| async {
486                                for (idx, item) in session_outcome.items.iter().enumerate() {
487                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
488                                }
489                            }).await;
490                        }
491                    }
492                }
493                signed_session_outcome = &mut request_signed_session_outcome => {
494                    assert_eq!(
495                        header,
496                        signed_session_outcome.session_outcome.header(session_index),
497                        "Consensus Failure: header disagrees with federation consensus"
498                    );
499
500                    return Ok(signed_session_outcome);
501                }
502            }
503        }
504
505        Ok(SignedSessionOutcome {
506            session_outcome,
507            signatures,
508        })
509    }
510
511    fn decoders(&self) -> ModuleDecoderRegistry {
512        self.modules.decoder_registry()
513    }
514
515    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
516        self.db
517            .begin_transaction_nc()
518            .await
519            .find_by_prefix(&AcceptedItemPrefix)
520            .await
521            .map(|entry| entry.1)
522            .collect()
523            .await
524    }
525
526    pub async fn complete_session(
527        &self,
528        session_index: u64,
529        signed_session_outcome: SignedSessionOutcome,
530    ) {
531        let mut dbtx = self.db.begin_transaction().await;
532
533        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
534
535        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
536
537        if dbtx
538            .insert_entry(
539                &SignedSessionOutcomeKey(session_index),
540                &signed_session_outcome,
541            )
542            .await
543            .is_some()
544        {
545            panic!("We tried to overwrite a signed session outcome");
546        }
547
548        dbtx.commit_tx_result()
549            .await
550            .expect("This is the only place where we write to this key");
551    }
552
553    /// Returns the full path where the database checkpoints are stored.
554    fn db_checkpoints_dir(&self) -> PathBuf {
555        self.data_dir.join(DB_CHECKPOINTS_DIR)
556    }
557
558    /// Creates the directory within the data directory for storing the database
559    /// checkpoints or deletes checkpoints before `current_session` -
560    /// `checkpoint_retention`.
561    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
562        let checkpoint_dir = self.db_checkpoints_dir();
563
564        if checkpoint_dir.exists() {
565            debug!(
566                target: LOG_CONSENSUS,
567                ?current_session,
568                "Removing database checkpoints up to `current_session`"
569            );
570
571            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
572                // Validate that the directory is a session index
573                if let Ok(file_name) = checkpoint.file_name().into_string() {
574                    if let Ok(session) = file_name.parse::<u64>() {
575                        if current_session >= self.checkpoint_retention
576                            && session < current_session - self.checkpoint_retention
577                        {
578                            fs::remove_dir_all(checkpoint.path())?;
579                        }
580                    }
581                }
582            }
583        } else {
584            fs::create_dir_all(&checkpoint_dir)?;
585        }
586
587        Ok(())
588    }
589
590    /// Creates a backup of the database in the checkpoint directory. These
591    /// checkpoints can be used to restore the database in case the
592    /// federation falls out of consensus (recommended for experts only).
593    fn checkpoint_database(&self, session_index: u64) {
594        // If `checkpoint_retention` has been turned off, don't checkpoint the database
595        // at all.
596        if self.checkpoint_retention == 0 {
597            return;
598        }
599
600        let checkpoint_dir = self.db_checkpoints_dir();
601        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
602
603        {
604            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::DEBUG);
605            match self.db.checkpoint(&session_checkpoint_dir) {
606                Ok(()) => {
607                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
608                }
609                Err(e) => {
610                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
611                }
612            }
613        }
614
615        {
616            // Check if any old checkpoint need to be cleaned up
617            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::DEBUG);
618            if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
619                warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
620            }
621        }
622    }
623
624    /// Deletes the database checkpoint directory equal to `session_index` -
625    /// `checkpoint_retention`
626    fn delete_old_database_checkpoint(
627        &self,
628        session_index: u64,
629        checkpoint_dir: &Path,
630    ) -> anyhow::Result<()> {
631        if self.checkpoint_retention > session_index {
632            return Ok(());
633        }
634
635        let delete_session_index = session_index - self.checkpoint_retention;
636        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
637        if checkpoint_to_delete.exists() {
638            fs::remove_dir_all(checkpoint_to_delete)?;
639        }
640
641        Ok(())
642    }
643
644    #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
645    pub async fn process_consensus_item(
646        &self,
647        session_index: u64,
648        item_index: u64,
649        item: ConsensusItem,
650        peer: PeerId,
651    ) -> anyhow::Result<()> {
652        let peer_id_str = &self.peer_id_str[peer.to_usize()];
653        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
654        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
655            .with_label_values(&[peer_id_str])
656            .start_timer();
657
658        trace!(
659            target: LOG_CONSENSUS,
660            %peer,
661            item = ?DebugConsensusItem(&item),
662            "Processing consensus item"
663        );
664
665        self.ci_status_senders
666            .get(&peer)
667            .expect("No ci status sender for peer {peer}")
668            .send(Some(session_index))
669            .inspect_err(|e| warn!(target: LOG_CONSENSUS, "Failed to update ci status {e}"))
670            .ok();
671
672        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
673            .with_label_values(&[&self.self_id_str, peer_id_str])
674            .set(session_index as i64);
675
676        let mut dbtx = self.db.begin_transaction().await;
677
678        dbtx.ignore_uncommitted();
679
680        // When we recover from a mid-session crash aleph bft will replay the units that
681        // were already processed before the crash. We therefore skip all consensus
682        // items until we have seen every previously accepted items again.
683        if let Some(existing_item) = dbtx
684            .get_value(&AcceptedItemKey(item_index.to_owned()))
685            .await
686        {
687            if existing_item.item == item && existing_item.peer == peer {
688                return Ok(());
689            }
690
691            bail!("Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}", existing_item.peer);
692        }
693
694        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
695            .await?;
696
697        // After this point we have to commit the database transaction since the
698        // item has been fully processed without errors
699        dbtx.warn_uncommitted();
700
701        dbtx.insert_entry(
702            &AcceptedItemKey(item_index),
703            &AcceptedItem {
704                item: item.clone(),
705                peer,
706            },
707        )
708        .await;
709
710        debug!(
711            target: LOG_CONSENSUS,
712            %peer,
713            item = ?DebugConsensusItem(&item),
714            "Processed consensus item"
715        );
716        let mut audit = Audit::default();
717
718        for (module_instance_id, kind, module) in self.modules.iter_modules() {
719            let _module_audit_timing =
720                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
721
722            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
723                .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
724                .start_timer();
725
726            module
727                .audit(
728                    &mut dbtx
729                        .to_ref_with_prefix_module_id(module_instance_id)
730                        .0
731                        .into_nc(),
732                    &mut audit,
733                    module_instance_id,
734                )
735                .await;
736            timing_prom.observe_duration();
737        }
738
739        assert!(
740            audit
741                .net_assets()
742                .expect("Overflow while checking balance sheet")
743                .milli_sat
744                >= 0,
745            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
746        );
747
748        dbtx.commit_tx_result()
749            .await
750            .expect("Committing consensus epoch failed");
751
752        CONSENSUS_ITEMS_PROCESSED_TOTAL
753            .with_label_values(&[&self.peer_id_str[peer.to_usize()]])
754            .inc();
755        timing_prom.observe_duration();
756
757        Ok(())
758    }
759
760    async fn process_consensus_item_with_db_transaction(
761        &self,
762        dbtx: &mut DatabaseTransaction<'_>,
763        consensus_item: ConsensusItem,
764        peer_id: PeerId,
765    ) -> anyhow::Result<()> {
766        // We rely on decoding rejecting any unknown module instance ids to avoid
767        // peer-triggered panic here
768        self.decoders().assert_reject_mode();
769
770        match consensus_item {
771            ConsensusItem::Module(module_item) => {
772                let instance_id = module_item.module_instance_id();
773
774                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
775
776                self.modules
777                    .get_expect(instance_id)
778                    .process_consensus_item(module_dbtx, &module_item, peer_id)
779                    .await
780            }
781            ConsensusItem::Transaction(transaction) => {
782                let txid = transaction.tx_hash();
783                if dbtx
784                    .get_value(&AcceptedTransactionKey(txid))
785                    .await
786                    .is_some()
787                {
788                    debug!(
789                        target: LOG_CONSENSUS,
790                        %txid,
791                        "Transaction already accepted"
792                    );
793                    bail!("Transaction is already accepted");
794                }
795
796                let modules_ids = transaction
797                    .outputs
798                    .iter()
799                    .map(DynOutput::module_instance_id)
800                    .collect::<Vec<_>>();
801
802                process_transaction_with_dbtx(
803                    self.modules.clone(),
804                    dbtx,
805                    &transaction,
806                    self.cfg.consensus.version,
807                    TxProcessingMode::Consensus,
808                )
809                .await
810                .map_err(|error| anyhow!(error.to_string()))?;
811
812                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
813                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
814                    .await;
815
816                Ok(())
817            }
818            ConsensusItem::Default { variant, .. } => {
819                warn!(
820                    target: LOG_CONSENSUS,
821                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
822                );
823
824                panic!("Unexpected consensus item type: {variant}")
825            }
826        }
827    }
828
829    async fn request_signed_session_outcome(
830        &self,
831        federation_api: &DynGlobalApi,
832        index: u64,
833    ) -> SignedSessionOutcome {
834        let decoders = self.decoders();
835        let keychain = Keychain::new(&self.cfg);
836        let threshold = self.num_peers().threshold();
837
838        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
839            let signed_session_outcome = response
840                .try_into_inner(&decoders)
841                .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
842            let header = signed_session_outcome.session_outcome.header(index);
843            if signed_session_outcome.signatures.len() == threshold
844                && signed_session_outcome
845                    .signatures
846                    .iter()
847                    .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
848            {
849                Ok(signed_session_outcome)
850            } else {
851                Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
852            }
853        };
854
855        loop {
856            let result = federation_api
857                .request_with_strategy(
858                    FilterMap::new(filter_map.clone()),
859                    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
860                    ApiRequestErased::new(index),
861                )
862                .await;
863
864            match result {
865                Ok(signed_session_outcome) => return signed_session_outcome,
866                Err(error) => {
867                    error.report_if_unusual("Requesting Session Outcome");
868                }
869            }
870        }
871    }
872
873    /// Returns the number of sessions already saved in the database. This count
874    /// **does not** include the currently running session.
875    async fn get_finished_session_count(&self) -> u64 {
876        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
877    }
878}
879
880pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
881    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
882        .await
883        .next()
884        .await
885        .map_or(0, |entry| (entry.0 .0) + 1)
886}