fedimint_server/consensus/
db.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::{DynInput, DynModuleConsensusItem, DynOutput, ModuleInstanceId};
5use fedimint_core::db::{
6    CoreMigrationFn, DatabaseVersion, IDatabaseTransactionOpsCoreTyped, MigrationContext,
7    MODULE_GLOBAL_PREFIX,
8};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::epoch::ConsensusItem;
11use fedimint_core::module::ModuleCommon;
12use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
13use fedimint_core::util::BoxStream;
14use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record, TransactionId};
15use futures::StreamExt;
16use serde::Serialize;
17use strum_macros::EnumIter;
18
19#[repr(u8)]
20#[derive(Clone, EnumIter, Debug)]
21pub enum DbKeyPrefix {
22    AcceptedItem = 0x01,
23    AcceptedTransaction = 0x02,
24    SignedSessionOutcome = 0x04,
25    AlephUnits = 0x05,
26    // TODO: do we want to split the server DB into consensus/non-consensus?
27    ApiAnnouncements = 0x06,
28    ServerInfo = 0x07,
29    Module = MODULE_GLOBAL_PREFIX,
30}
31
32impl std::fmt::Display for DbKeyPrefix {
33    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
34        write!(f, "{self:?}")
35    }
36}
37
38#[derive(Clone, Debug, Encodable, Decodable)]
39pub struct AcceptedItemKey(pub u64);
40
41#[derive(Clone, Debug, Encodable, Decodable)]
42pub struct AcceptedItemPrefix;
43
44impl_db_record!(
45    key = AcceptedItemKey,
46    value = AcceptedItem,
47    db_prefix = DbKeyPrefix::AcceptedItem,
48    notify_on_modify = false,
49);
50impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
51
52#[derive(Debug, Encodable, Decodable, Serialize)]
53pub struct AcceptedTransactionKey(pub TransactionId);
54
55#[derive(Debug, Encodable, Decodable)]
56pub struct AcceptedTransactionKeyPrefix;
57
58impl_db_record!(
59    key = AcceptedTransactionKey,
60    value = Vec<ModuleInstanceId>,
61    db_prefix = DbKeyPrefix::AcceptedTransaction,
62    notify_on_modify = true,
63);
64impl_db_lookup!(
65    key = AcceptedTransactionKey,
66    query_prefix = AcceptedTransactionKeyPrefix
67);
68
69#[derive(Debug, Encodable, Decodable)]
70pub struct SignedSessionOutcomeKey(pub u64);
71
72#[derive(Debug, Encodable, Decodable)]
73pub struct SignedSessionOutcomePrefix;
74
75impl_db_record!(
76    key = SignedSessionOutcomeKey,
77    value = SignedSessionOutcome,
78    db_prefix = DbKeyPrefix::SignedSessionOutcome,
79    notify_on_modify = true,
80);
81impl_db_lookup!(
82    key = SignedSessionOutcomeKey,
83    query_prefix = SignedSessionOutcomePrefix
84);
85
86#[derive(Debug, Encodable, Decodable)]
87pub struct AlephUnitsKey(pub u64);
88
89#[derive(Debug, Encodable, Decodable)]
90pub struct AlephUnitsPrefix;
91
92impl_db_record!(
93    key = AlephUnitsKey,
94    value = Vec<u8>,
95    db_prefix = DbKeyPrefix::AlephUnits,
96    notify_on_modify = false,
97);
98impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
99
100pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
101    BTreeMap::new()
102}
103
104pub enum ModuleHistoryItem {
105    ConsensusItem(DynModuleConsensusItem),
106    Input(DynInput),
107    Output(DynOutput),
108}
109
110pub enum TypedModuleHistoryItem<M: ModuleCommon> {
111    ConsensusItem(M::ConsensusItem),
112    Input(M::Input),
113    Output(M::Output),
114}
115
116#[apply(async_trait_maybe_send!)]
117pub trait MigrationContextExt {
118    async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem>;
119
120    async fn get_typed_module_history_stream<M: ModuleCommon>(
121        &mut self,
122    ) -> BoxStream<TypedModuleHistoryItem<M>>;
123}
124
125#[apply(async_trait_maybe_send!)]
126impl MigrationContextExt for MigrationContext<'_> {
127    async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem> {
128        let module_instance_id = self
129            .module_instance_id()
130            .expect("module_instance_id must be set");
131
132        // Items of the currently ongoing session, that have already been processed. We
133        // have to query them in full first and collect them into a vector so we don't
134        // hold two references to the dbtx at the same time.
135        let active_session_items = self
136            .__global_dbtx()
137            .find_by_prefix(&AcceptedItemPrefix)
138            .await
139            .map(|(_, item)| item)
140            .collect::<Vec<_>>()
141            .await;
142
143        let stream = self
144            .__global_dbtx()
145            .find_by_prefix(&SignedSessionOutcomePrefix)
146            .await
147            // Transform the session stream into an accepted item stream
148            .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
149                futures::stream::iter(signed_session_outcome.session_outcome.items)
150            })
151            // Append the accepted items from the current session after all the signed session items
152            // have been processed
153            .chain(futures::stream::iter(active_session_items))
154            .flat_map(move |item| {
155                let history_items = match item.item {
156                    ConsensusItem::Transaction(tx) => tx
157                        .inputs
158                        .into_iter()
159                        .filter_map(|input| {
160                            (input.module_instance_id() == module_instance_id)
161                                .then_some(ModuleHistoryItem::Input(input))
162                        })
163                        .chain(tx.outputs.into_iter().filter_map(|output| {
164                            (output.module_instance_id() == module_instance_id)
165                                .then_some(ModuleHistoryItem::Output(output))
166                        }))
167                        .collect::<Vec<_>>(),
168                    ConsensusItem::Module(mci) => {
169                        if mci.module_instance_id() == module_instance_id {
170                            vec![ModuleHistoryItem::ConsensusItem(mci)]
171                        } else {
172                            vec![]
173                        }
174                    }
175                    ConsensusItem::Default { .. } => {
176                        unreachable!("We never save unknown CIs on the server side")
177                    }
178                };
179                futures::stream::iter(history_items)
180            });
181
182        Box::pin(stream)
183    }
184
185    async fn get_typed_module_history_stream<M: ModuleCommon>(
186        &mut self,
187    ) -> BoxStream<TypedModuleHistoryItem<M>> {
188        Box::pin(self.get_module_history_stream().await.map(|item| {
189            match item {
190                ModuleHistoryItem::ConsensusItem(ci) => TypedModuleHistoryItem::ConsensusItem(
191                    ci.as_any()
192                        .downcast_ref::<M::ConsensusItem>()
193                        .expect("Wrong module type")
194                        .clone(),
195                ),
196                ModuleHistoryItem::Input(input) => TypedModuleHistoryItem::Input(
197                    input
198                        .as_any()
199                        .downcast_ref::<M::Input>()
200                        .expect("Wrong module type")
201                        .clone(),
202                ),
203                ModuleHistoryItem::Output(output) => TypedModuleHistoryItem::Output(
204                    output
205                        .as_any()
206                        .downcast_ref::<M::Output>()
207                        .expect("Wrong module type")
208                        .clone(),
209                ),
210            }
211        }))
212    }
213}