fedimint_server/consensus/
db.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::{DynInput, DynModuleConsensusItem, DynOutput, ModuleInstanceId};
5use fedimint_core::db::{
6    CoreMigrationFn, DatabaseVersion, IDatabaseTransactionOpsCoreTyped, MigrationContext,
7    MODULE_GLOBAL_PREFIX,
8};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::epoch::ConsensusItem;
11use fedimint_core::module::ModuleCommon;
12use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
13use fedimint_core::util::BoxStream;
14use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record, TransactionId};
15use futures::StreamExt;
16use serde::Serialize;
17use strum_macros::EnumIter;
18
19#[repr(u8)]
20#[derive(Clone, EnumIter, Debug)]
21pub enum DbKeyPrefix {
22    AcceptedItem = 0x01,
23    AcceptedTransaction = 0x02,
24    SignedSessionOutcome = 0x04,
25    AlephUnits = 0x05,
26    // TODO: do we want to split the server DB into consensus/non-consensus?
27    ApiAnnouncements = 0x06,
28    Module = MODULE_GLOBAL_PREFIX,
29}
30
31impl std::fmt::Display for DbKeyPrefix {
32    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
33        write!(f, "{self:?}")
34    }
35}
36
37#[derive(Clone, Debug, Encodable, Decodable)]
38pub struct AcceptedItemKey(pub u64);
39
40#[derive(Clone, Debug, Encodable, Decodable)]
41pub struct AcceptedItemPrefix;
42
43impl_db_record!(
44    key = AcceptedItemKey,
45    value = AcceptedItem,
46    db_prefix = DbKeyPrefix::AcceptedItem,
47    notify_on_modify = false,
48);
49impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
50
51#[derive(Debug, Encodable, Decodable, Serialize)]
52pub struct AcceptedTransactionKey(pub TransactionId);
53
54#[derive(Debug, Encodable, Decodable)]
55pub struct AcceptedTransactionKeyPrefix;
56
57impl_db_record!(
58    key = AcceptedTransactionKey,
59    value = Vec<ModuleInstanceId>,
60    db_prefix = DbKeyPrefix::AcceptedTransaction,
61    notify_on_modify = true,
62);
63impl_db_lookup!(
64    key = AcceptedTransactionKey,
65    query_prefix = AcceptedTransactionKeyPrefix
66);
67
68#[derive(Debug, Encodable, Decodable)]
69pub struct SignedSessionOutcomeKey(pub u64);
70
71#[derive(Debug, Encodable, Decodable)]
72pub struct SignedSessionOutcomePrefix;
73
74impl_db_record!(
75    key = SignedSessionOutcomeKey,
76    value = SignedSessionOutcome,
77    db_prefix = DbKeyPrefix::SignedSessionOutcome,
78    notify_on_modify = true,
79);
80impl_db_lookup!(
81    key = SignedSessionOutcomeKey,
82    query_prefix = SignedSessionOutcomePrefix
83);
84
85#[derive(Debug, Encodable, Decodable)]
86pub struct AlephUnitsKey(pub u64);
87
88#[derive(Debug, Encodable, Decodable)]
89pub struct AlephUnitsPrefix;
90
91impl_db_record!(
92    key = AlephUnitsKey,
93    value = Vec<u8>,
94    db_prefix = DbKeyPrefix::AlephUnits,
95    notify_on_modify = false,
96);
97impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
98
99pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
100    BTreeMap::new()
101}
102
103pub enum ModuleHistoryItem {
104    ConsensusItem(DynModuleConsensusItem),
105    Input(DynInput),
106    Output(DynOutput),
107}
108
109pub enum TypedModuleHistoryItem<M: ModuleCommon> {
110    ConsensusItem(M::ConsensusItem),
111    Input(M::Input),
112    Output(M::Output),
113}
114
115#[apply(async_trait_maybe_send!)]
116pub trait MigrationContextExt {
117    async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem>;
118
119    async fn get_typed_module_history_stream<M: ModuleCommon>(
120        &mut self,
121    ) -> BoxStream<TypedModuleHistoryItem<M>>;
122}
123
124#[apply(async_trait_maybe_send!)]
125impl MigrationContextExt for MigrationContext<'_> {
126    async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem> {
127        let module_instance_id = self
128            .module_instance_id()
129            .expect("module_instance_id must be set");
130
131        // Items of the currently ongoing session, that have already been processed. We
132        // have to query them in full first and collect them into a vector so we don't
133        // hold two references to the dbtx at the same time.
134        let active_session_items = self
135            .__global_dbtx()
136            .find_by_prefix(&AcceptedItemPrefix)
137            .await
138            .map(|(_, item)| item)
139            .collect::<Vec<_>>()
140            .await;
141
142        let stream = self
143            .__global_dbtx()
144            .find_by_prefix(&SignedSessionOutcomePrefix)
145            .await
146            // Transform the session stream into an accepted item stream
147            .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
148                futures::stream::iter(signed_session_outcome.session_outcome.items)
149            })
150            // Append the accepted items from the current session after all the signed session items
151            // have been processed
152            .chain(futures::stream::iter(active_session_items))
153            .flat_map(move |item| {
154                let history_items = match item.item {
155                    ConsensusItem::Transaction(tx) => tx
156                        .inputs
157                        .into_iter()
158                        .filter_map(|input| {
159                            (input.module_instance_id() == module_instance_id)
160                                .then_some(ModuleHistoryItem::Input(input))
161                        })
162                        .chain(tx.outputs.into_iter().filter_map(|output| {
163                            (output.module_instance_id() == module_instance_id)
164                                .then_some(ModuleHistoryItem::Output(output))
165                        }))
166                        .collect::<Vec<_>>(),
167                    ConsensusItem::Module(mci) => {
168                        if mci.module_instance_id() == module_instance_id {
169                            vec![ModuleHistoryItem::ConsensusItem(mci)]
170                        } else {
171                            vec![]
172                        }
173                    }
174                    ConsensusItem::Default { .. } => {
175                        unreachable!("We never save unknown CIs on the server side")
176                    }
177                };
178                futures::stream::iter(history_items)
179            });
180
181        Box::pin(stream)
182    }
183
184    async fn get_typed_module_history_stream<M: ModuleCommon>(
185        &mut self,
186    ) -> BoxStream<TypedModuleHistoryItem<M>> {
187        Box::pin(self.get_module_history_stream().await.map(|item| {
188            match item {
189                ModuleHistoryItem::ConsensusItem(ci) => TypedModuleHistoryItem::ConsensusItem(
190                    ci.as_any()
191                        .downcast_ref::<M::ConsensusItem>()
192                        .expect("Wrong module type")
193                        .clone(),
194                ),
195                ModuleHistoryItem::Input(input) => TypedModuleHistoryItem::Input(
196                    input
197                        .as_any()
198                        .downcast_ref::<M::Input>()
199                        .expect("Wrong module type")
200                        .clone(),
201                ),
202                ModuleHistoryItem::Output(output) => TypedModuleHistoryItem::Output(
203                    output
204                        .as_any()
205                        .downcast_ref::<M::Output>()
206                        .expect("Wrong module type")
207                        .clone(),
208                ),
209            }
210        }))
211    }
212}
213
214#[cfg(test)]
215mod fedimint_migration_tests {
216    use std::collections::BTreeMap;
217    use std::str::FromStr;
218
219    use anyhow::ensure;
220    use bitcoin::key::Keypair;
221    use bitcoin::secp256k1;
222    use fedimint_core::core::{DynInput, DynOutput};
223    use fedimint_core::db::{
224        Database, DatabaseVersion, DatabaseVersionKeyV0, IDatabaseTransactionOpsCoreTyped,
225    };
226    use fedimint_core::epoch::ConsensusItem;
227    use fedimint_core::module::registry::ModuleDecoderRegistry;
228    use fedimint_core::module::CommonModuleInit;
229    use fedimint_core::net::api_announcement::{ApiAnnouncement, SignedApiAnnouncement};
230    use fedimint_core::session_outcome::{SessionOutcome, SignedSessionOutcome};
231    use fedimint_core::transaction::{Transaction, TransactionSignature};
232    use fedimint_core::{Amount, BitcoinHash, PeerId, ServerModule, TransactionId};
233    use fedimint_dummy_common::{DummyCommonInit, DummyInput, DummyOutput};
234    use fedimint_dummy_server::Dummy;
235    use fedimint_logging::{TracingSetup, LOG_DB};
236    use fedimint_testing_core::db::{
237        snapshot_db_migrations_with_decoders, validate_migrations_global, BYTE_32,
238        TEST_MODULE_INSTANCE_ID,
239    };
240    use futures::StreamExt;
241    use rand::rngs::OsRng;
242    use rand::thread_rng;
243    use secp256k1::Message;
244    use strum::IntoEnumIterator;
245    use tracing::info;
246
247    use super::{
248        get_global_database_migrations, AcceptedItem, AcceptedItemKey, AcceptedItemPrefix,
249        AcceptedTransactionKey, AcceptedTransactionKeyPrefix, AlephUnitsKey, AlephUnitsPrefix,
250        DbKeyPrefix, SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
251    };
252    use crate::net::api::announcement::{ApiAnnouncementKey, ApiAnnouncementPrefix};
253
254    /// Create a database with version 0 data. The database produced is not
255    /// intended to be real data or semantically correct. It is only
256    /// intended to provide coverage when reading the database
257    /// in future code versions. This function should not be updated when
258    /// database keys/values change - instead a new function should be added
259    /// that creates a new database backup that can be tested.
260    async fn create_server_db_with_v0_data(db: Database) {
261        let mut dbtx = db.begin_transaction().await;
262
263        // Will be migrated to `DatabaseVersionKey` during `apply_migrations`
264        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
265            .await;
266
267        let accepted_tx_id = AcceptedTransactionKey(TransactionId::from_slice(&BYTE_32).unwrap());
268
269        let (sk, _) = secp256k1::generate_keypair(&mut OsRng);
270        let secp = secp256k1::Secp256k1::new();
271        let key_pair = Keypair::from_secret_key(&secp, &sk);
272        let schnorr = secp.sign_schnorr_with_rng(
273            &Message::from_digest_slice(&BYTE_32).unwrap(),
274            &key_pair,
275            &mut thread_rng(),
276        );
277        let transaction = Transaction {
278            inputs: vec![DynInput::from_typed(
279                0,
280                DummyInput {
281                    amount: Amount::ZERO,
282                    account: key_pair.public_key(),
283                },
284            )],
285            outputs: vec![DynOutput::from_typed(
286                0,
287                DummyOutput {
288                    amount: Amount::ZERO,
289                    account: key_pair.public_key(),
290                },
291            )],
292            nonce: [0x42; 8],
293            signatures: TransactionSignature::NaiveMultisig(vec![schnorr]),
294        };
295
296        let module_ids = transaction
297            .outputs
298            .iter()
299            .map(DynOutput::module_instance_id)
300            .collect::<Vec<_>>();
301
302        dbtx.insert_new_entry(&accepted_tx_id, &module_ids).await;
303
304        dbtx.insert_new_entry(
305            &AcceptedItemKey(0),
306            &AcceptedItem {
307                item: ConsensusItem::Transaction(transaction.clone()),
308                peer: PeerId::from_str("0").unwrap(),
309            },
310        )
311        .await;
312
313        dbtx.insert_new_entry(
314            &SignedSessionOutcomeKey(0),
315            &SignedSessionOutcome {
316                session_outcome: SessionOutcome { items: Vec::new() },
317                signatures: BTreeMap::new(),
318            },
319        )
320        .await;
321
322        dbtx.insert_new_entry(&AlephUnitsKey(0), &vec![42, 42, 42])
323            .await;
324
325        dbtx.insert_new_entry(
326            &ApiAnnouncementKey(PeerId::from(42)),
327            &SignedApiAnnouncement {
328                api_announcement: ApiAnnouncement {
329                    api_url: "wss://foo.bar".parse().expect("valid url"),
330                    nonce: 0,
331                },
332                signature: secp256k1::schnorr::Signature::from_slice(&[42; 64]).unwrap(),
333            },
334        )
335        .await;
336
337        dbtx.commit_tx().await;
338    }
339
340    #[tokio::test(flavor = "multi_thread")]
341    async fn snapshot_server_db_migrations() -> anyhow::Result<()> {
342        snapshot_db_migrations_with_decoders(
343            "fedimint-server",
344            |db| {
345                Box::pin(async {
346                    create_server_db_with_v0_data(db).await;
347                })
348            },
349            ModuleDecoderRegistry::from_iter([(
350                TEST_MODULE_INSTANCE_ID,
351                DummyCommonInit::KIND,
352                <Dummy as ServerModule>::decoder(),
353            )]),
354        )
355        .await
356    }
357
358    #[tokio::test(flavor = "multi_thread")]
359    async fn test_server_db_migrations() -> anyhow::Result<()> {
360        let _ = TracingSetup::default().init();
361
362        validate_migrations_global(
363            |db| async move {
364                let mut dbtx = db.begin_transaction_nc().await;
365
366                for prefix in DbKeyPrefix::iter() {
367                    match prefix {
368                        DbKeyPrefix::AcceptedItem => {
369                            let accepted_items = dbtx
370                                .find_by_prefix(&AcceptedItemPrefix)
371                                .await
372                                .collect::<Vec<_>>()
373                                .await;
374                            let accepted_items = accepted_items.len();
375                            ensure!(
376                                accepted_items > 0,
377                                "validate_migrations was not able to read any AcceptedItems"
378                            );
379                            info!(target: LOG_DB, "Validated AcceptedItems");
380                        }
381                        DbKeyPrefix::AcceptedTransaction => {
382                            let accepted_transactions = dbtx
383                                .find_by_prefix(&AcceptedTransactionKeyPrefix)
384                                .await
385                                .collect::<Vec<_>>()
386                                .await;
387                            let num_accepted_transactions = accepted_transactions.len();
388                            ensure!(
389                                num_accepted_transactions > 0,
390                                "validate_migrations was not able to read any AcceptedTransactions"
391                            );
392                            info!(target: LOG_DB, "Validated AcceptedTransactions");
393                        }
394                        DbKeyPrefix::SignedSessionOutcome => {
395                            let signed_session_outcomes = dbtx
396                                .find_by_prefix(&SignedSessionOutcomePrefix)
397                                .await
398                                .collect::<Vec<_>>()
399                                .await;
400                            let num_signed_session_outcomes = signed_session_outcomes.len();
401                            ensure!(
402                            num_signed_session_outcomes > 0,
403                            "validate_migrations was not able to read any SignedSessionOutcomes"
404                        );
405                            info!(target: LOG_DB, "Validated SignedSessionOutcome");
406                        }
407                        DbKeyPrefix::AlephUnits => {
408                            let aleph_units = dbtx
409                                .find_by_prefix(&AlephUnitsPrefix)
410                                .await
411                                .collect::<Vec<_>>()
412                                .await;
413                            let num_aleph_units = aleph_units.len();
414                            ensure!(
415                                num_aleph_units > 0,
416                                "validate_migrations was not able to read any AlephUnits"
417                            );
418                            info!(target: LOG_DB, "Validated AlephUnits");
419                        }
420                        // Module prefix is reserved for modules, no migration testing is needed
421                        DbKeyPrefix::Module => {}
422                        DbKeyPrefix::ApiAnnouncements => {
423                            let announcements = dbtx
424                                .find_by_prefix(&ApiAnnouncementPrefix)
425                                .await
426                                .collect::<Vec<_>>()
427                                .await;
428
429                            assert_eq!(announcements.len(), 1);
430                        }
431                    }
432                }
433                Ok(())
434            },
435            "fedimint-server",
436            get_global_database_migrations(),
437            ModuleDecoderRegistry::from_iter([(
438                TEST_MODULE_INSTANCE_ID,
439                DummyCommonInit::KIND,
440                <Dummy as ServerModule>::decoder(),
441            )]),
442        )
443        .await
444    }
445}