1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::{DynInput, DynModuleConsensusItem, DynOutput, ModuleInstanceId};
5use fedimint_core::db::{
6 CoreMigrationFn, DatabaseVersion, IDatabaseTransactionOpsCoreTyped, MigrationContext,
7 MODULE_GLOBAL_PREFIX,
8};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::epoch::ConsensusItem;
11use fedimint_core::module::ModuleCommon;
12use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
13use fedimint_core::util::BoxStream;
14use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record, TransactionId};
15use futures::StreamExt;
16use serde::Serialize;
17use strum_macros::EnumIter;
18
19#[repr(u8)]
20#[derive(Clone, EnumIter, Debug)]
21pub enum DbKeyPrefix {
22 AcceptedItem = 0x01,
23 AcceptedTransaction = 0x02,
24 SignedSessionOutcome = 0x04,
25 AlephUnits = 0x05,
26 ApiAnnouncements = 0x06,
28 Module = MODULE_GLOBAL_PREFIX,
29}
30
31impl std::fmt::Display for DbKeyPrefix {
32 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
33 write!(f, "{self:?}")
34 }
35}
36
37#[derive(Clone, Debug, Encodable, Decodable)]
38pub struct AcceptedItemKey(pub u64);
39
40#[derive(Clone, Debug, Encodable, Decodable)]
41pub struct AcceptedItemPrefix;
42
43impl_db_record!(
44 key = AcceptedItemKey,
45 value = AcceptedItem,
46 db_prefix = DbKeyPrefix::AcceptedItem,
47 notify_on_modify = false,
48);
49impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
50
51#[derive(Debug, Encodable, Decodable, Serialize)]
52pub struct AcceptedTransactionKey(pub TransactionId);
53
54#[derive(Debug, Encodable, Decodable)]
55pub struct AcceptedTransactionKeyPrefix;
56
57impl_db_record!(
58 key = AcceptedTransactionKey,
59 value = Vec<ModuleInstanceId>,
60 db_prefix = DbKeyPrefix::AcceptedTransaction,
61 notify_on_modify = true,
62);
63impl_db_lookup!(
64 key = AcceptedTransactionKey,
65 query_prefix = AcceptedTransactionKeyPrefix
66);
67
68#[derive(Debug, Encodable, Decodable)]
69pub struct SignedSessionOutcomeKey(pub u64);
70
71#[derive(Debug, Encodable, Decodable)]
72pub struct SignedSessionOutcomePrefix;
73
74impl_db_record!(
75 key = SignedSessionOutcomeKey,
76 value = SignedSessionOutcome,
77 db_prefix = DbKeyPrefix::SignedSessionOutcome,
78 notify_on_modify = true,
79);
80impl_db_lookup!(
81 key = SignedSessionOutcomeKey,
82 query_prefix = SignedSessionOutcomePrefix
83);
84
85#[derive(Debug, Encodable, Decodable)]
86pub struct AlephUnitsKey(pub u64);
87
88#[derive(Debug, Encodable, Decodable)]
89pub struct AlephUnitsPrefix;
90
91impl_db_record!(
92 key = AlephUnitsKey,
93 value = Vec<u8>,
94 db_prefix = DbKeyPrefix::AlephUnits,
95 notify_on_modify = false,
96);
97impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
98
99pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
100 BTreeMap::new()
101}
102
103pub enum ModuleHistoryItem {
104 ConsensusItem(DynModuleConsensusItem),
105 Input(DynInput),
106 Output(DynOutput),
107}
108
109pub enum TypedModuleHistoryItem<M: ModuleCommon> {
110 ConsensusItem(M::ConsensusItem),
111 Input(M::Input),
112 Output(M::Output),
113}
114
115#[apply(async_trait_maybe_send!)]
116pub trait MigrationContextExt {
117 async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem>;
118
119 async fn get_typed_module_history_stream<M: ModuleCommon>(
120 &mut self,
121 ) -> BoxStream<TypedModuleHistoryItem<M>>;
122}
123
124#[apply(async_trait_maybe_send!)]
125impl MigrationContextExt for MigrationContext<'_> {
126 async fn get_module_history_stream(&mut self) -> BoxStream<ModuleHistoryItem> {
127 let module_instance_id = self
128 .module_instance_id()
129 .expect("module_instance_id must be set");
130
131 let active_session_items = self
135 .__global_dbtx()
136 .find_by_prefix(&AcceptedItemPrefix)
137 .await
138 .map(|(_, item)| item)
139 .collect::<Vec<_>>()
140 .await;
141
142 let stream = self
143 .__global_dbtx()
144 .find_by_prefix(&SignedSessionOutcomePrefix)
145 .await
146 .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
148 futures::stream::iter(signed_session_outcome.session_outcome.items)
149 })
150 .chain(futures::stream::iter(active_session_items))
153 .flat_map(move |item| {
154 let history_items = match item.item {
155 ConsensusItem::Transaction(tx) => tx
156 .inputs
157 .into_iter()
158 .filter_map(|input| {
159 (input.module_instance_id() == module_instance_id)
160 .then_some(ModuleHistoryItem::Input(input))
161 })
162 .chain(tx.outputs.into_iter().filter_map(|output| {
163 (output.module_instance_id() == module_instance_id)
164 .then_some(ModuleHistoryItem::Output(output))
165 }))
166 .collect::<Vec<_>>(),
167 ConsensusItem::Module(mci) => {
168 if mci.module_instance_id() == module_instance_id {
169 vec![ModuleHistoryItem::ConsensusItem(mci)]
170 } else {
171 vec![]
172 }
173 }
174 ConsensusItem::Default { .. } => {
175 unreachable!("We never save unknown CIs on the server side")
176 }
177 };
178 futures::stream::iter(history_items)
179 });
180
181 Box::pin(stream)
182 }
183
184 async fn get_typed_module_history_stream<M: ModuleCommon>(
185 &mut self,
186 ) -> BoxStream<TypedModuleHistoryItem<M>> {
187 Box::pin(self.get_module_history_stream().await.map(|item| {
188 match item {
189 ModuleHistoryItem::ConsensusItem(ci) => TypedModuleHistoryItem::ConsensusItem(
190 ci.as_any()
191 .downcast_ref::<M::ConsensusItem>()
192 .expect("Wrong module type")
193 .clone(),
194 ),
195 ModuleHistoryItem::Input(input) => TypedModuleHistoryItem::Input(
196 input
197 .as_any()
198 .downcast_ref::<M::Input>()
199 .expect("Wrong module type")
200 .clone(),
201 ),
202 ModuleHistoryItem::Output(output) => TypedModuleHistoryItem::Output(
203 output
204 .as_any()
205 .downcast_ref::<M::Output>()
206 .expect("Wrong module type")
207 .clone(),
208 ),
209 }
210 }))
211 }
212}
213
214#[cfg(test)]
215mod fedimint_migration_tests {
216 use std::collections::BTreeMap;
217 use std::str::FromStr;
218
219 use anyhow::ensure;
220 use bitcoin::key::Keypair;
221 use bitcoin::secp256k1;
222 use fedimint_core::core::{DynInput, DynOutput};
223 use fedimint_core::db::{
224 Database, DatabaseVersion, DatabaseVersionKeyV0, IDatabaseTransactionOpsCoreTyped,
225 };
226 use fedimint_core::epoch::ConsensusItem;
227 use fedimint_core::module::registry::ModuleDecoderRegistry;
228 use fedimint_core::module::CommonModuleInit;
229 use fedimint_core::net::api_announcement::{ApiAnnouncement, SignedApiAnnouncement};
230 use fedimint_core::session_outcome::{SessionOutcome, SignedSessionOutcome};
231 use fedimint_core::transaction::{Transaction, TransactionSignature};
232 use fedimint_core::{Amount, BitcoinHash, PeerId, ServerModule, TransactionId};
233 use fedimint_dummy_common::{DummyCommonInit, DummyInput, DummyOutput};
234 use fedimint_dummy_server::Dummy;
235 use fedimint_logging::{TracingSetup, LOG_DB};
236 use fedimint_testing_core::db::{
237 snapshot_db_migrations_with_decoders, validate_migrations_global, BYTE_32,
238 TEST_MODULE_INSTANCE_ID,
239 };
240 use futures::StreamExt;
241 use rand::rngs::OsRng;
242 use rand::thread_rng;
243 use secp256k1::Message;
244 use strum::IntoEnumIterator;
245 use tracing::info;
246
247 use super::{
248 get_global_database_migrations, AcceptedItem, AcceptedItemKey, AcceptedItemPrefix,
249 AcceptedTransactionKey, AcceptedTransactionKeyPrefix, AlephUnitsKey, AlephUnitsPrefix,
250 DbKeyPrefix, SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
251 };
252 use crate::net::api::announcement::{ApiAnnouncementKey, ApiAnnouncementPrefix};
253
254 async fn create_server_db_with_v0_data(db: Database) {
261 let mut dbtx = db.begin_transaction().await;
262
263 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
265 .await;
266
267 let accepted_tx_id = AcceptedTransactionKey(TransactionId::from_slice(&BYTE_32).unwrap());
268
269 let (sk, _) = secp256k1::generate_keypair(&mut OsRng);
270 let secp = secp256k1::Secp256k1::new();
271 let key_pair = Keypair::from_secret_key(&secp, &sk);
272 let schnorr = secp.sign_schnorr_with_rng(
273 &Message::from_digest_slice(&BYTE_32).unwrap(),
274 &key_pair,
275 &mut thread_rng(),
276 );
277 let transaction = Transaction {
278 inputs: vec![DynInput::from_typed(
279 0,
280 DummyInput {
281 amount: Amount::ZERO,
282 account: key_pair.public_key(),
283 },
284 )],
285 outputs: vec![DynOutput::from_typed(
286 0,
287 DummyOutput {
288 amount: Amount::ZERO,
289 account: key_pair.public_key(),
290 },
291 )],
292 nonce: [0x42; 8],
293 signatures: TransactionSignature::NaiveMultisig(vec![schnorr]),
294 };
295
296 let module_ids = transaction
297 .outputs
298 .iter()
299 .map(DynOutput::module_instance_id)
300 .collect::<Vec<_>>();
301
302 dbtx.insert_new_entry(&accepted_tx_id, &module_ids).await;
303
304 dbtx.insert_new_entry(
305 &AcceptedItemKey(0),
306 &AcceptedItem {
307 item: ConsensusItem::Transaction(transaction.clone()),
308 peer: PeerId::from_str("0").unwrap(),
309 },
310 )
311 .await;
312
313 dbtx.insert_new_entry(
314 &SignedSessionOutcomeKey(0),
315 &SignedSessionOutcome {
316 session_outcome: SessionOutcome { items: Vec::new() },
317 signatures: BTreeMap::new(),
318 },
319 )
320 .await;
321
322 dbtx.insert_new_entry(&AlephUnitsKey(0), &vec![42, 42, 42])
323 .await;
324
325 dbtx.insert_new_entry(
326 &ApiAnnouncementKey(PeerId::from(42)),
327 &SignedApiAnnouncement {
328 api_announcement: ApiAnnouncement {
329 api_url: "wss://foo.bar".parse().expect("valid url"),
330 nonce: 0,
331 },
332 signature: secp256k1::schnorr::Signature::from_slice(&[42; 64]).unwrap(),
333 },
334 )
335 .await;
336
337 dbtx.commit_tx().await;
338 }
339
340 #[tokio::test(flavor = "multi_thread")]
341 async fn snapshot_server_db_migrations() -> anyhow::Result<()> {
342 snapshot_db_migrations_with_decoders(
343 "fedimint-server",
344 |db| {
345 Box::pin(async {
346 create_server_db_with_v0_data(db).await;
347 })
348 },
349 ModuleDecoderRegistry::from_iter([(
350 TEST_MODULE_INSTANCE_ID,
351 DummyCommonInit::KIND,
352 <Dummy as ServerModule>::decoder(),
353 )]),
354 )
355 .await
356 }
357
358 #[tokio::test(flavor = "multi_thread")]
359 async fn test_server_db_migrations() -> anyhow::Result<()> {
360 let _ = TracingSetup::default().init();
361
362 validate_migrations_global(
363 |db| async move {
364 let mut dbtx = db.begin_transaction_nc().await;
365
366 for prefix in DbKeyPrefix::iter() {
367 match prefix {
368 DbKeyPrefix::AcceptedItem => {
369 let accepted_items = dbtx
370 .find_by_prefix(&AcceptedItemPrefix)
371 .await
372 .collect::<Vec<_>>()
373 .await;
374 let accepted_items = accepted_items.len();
375 ensure!(
376 accepted_items > 0,
377 "validate_migrations was not able to read any AcceptedItems"
378 );
379 info!(target: LOG_DB, "Validated AcceptedItems");
380 }
381 DbKeyPrefix::AcceptedTransaction => {
382 let accepted_transactions = dbtx
383 .find_by_prefix(&AcceptedTransactionKeyPrefix)
384 .await
385 .collect::<Vec<_>>()
386 .await;
387 let num_accepted_transactions = accepted_transactions.len();
388 ensure!(
389 num_accepted_transactions > 0,
390 "validate_migrations was not able to read any AcceptedTransactions"
391 );
392 info!(target: LOG_DB, "Validated AcceptedTransactions");
393 }
394 DbKeyPrefix::SignedSessionOutcome => {
395 let signed_session_outcomes = dbtx
396 .find_by_prefix(&SignedSessionOutcomePrefix)
397 .await
398 .collect::<Vec<_>>()
399 .await;
400 let num_signed_session_outcomes = signed_session_outcomes.len();
401 ensure!(
402 num_signed_session_outcomes > 0,
403 "validate_migrations was not able to read any SignedSessionOutcomes"
404 );
405 info!(target: LOG_DB, "Validated SignedSessionOutcome");
406 }
407 DbKeyPrefix::AlephUnits => {
408 let aleph_units = dbtx
409 .find_by_prefix(&AlephUnitsPrefix)
410 .await
411 .collect::<Vec<_>>()
412 .await;
413 let num_aleph_units = aleph_units.len();
414 ensure!(
415 num_aleph_units > 0,
416 "validate_migrations was not able to read any AlephUnits"
417 );
418 info!(target: LOG_DB, "Validated AlephUnits");
419 }
420 DbKeyPrefix::Module => {}
422 DbKeyPrefix::ApiAnnouncements => {
423 let announcements = dbtx
424 .find_by_prefix(&ApiAnnouncementPrefix)
425 .await
426 .collect::<Vec<_>>()
427 .await;
428
429 assert_eq!(announcements.len(), 1);
430 }
431 }
432 }
433 Ok(())
434 },
435 "fedimint-server",
436 get_global_database_migrations(),
437 ModuleDecoderRegistry::from_iter([(
438 TEST_MODULE_INSTANCE_ID,
439 DummyCommonInit::KIND,
440 <Dummy as ServerModule>::decoder(),
441 )]),
442 )
443 .await
444 }
445}