1pub mod event_log;
2
3use std::collections::BTreeMap;
4use std::io::Cursor;
5use std::time::SystemTime;
6
7use fedimint_api_client::api::ApiVersionSet;
8use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
9use fedimint_core::core::{ModuleInstanceId, OperationId};
10use fedimint_core::db::{
11 apply_migrations, create_database_version, get_current_database_version, CoreMigrationFn,
12 Database, DatabaseTransaction, DatabaseValue, DatabaseVersion, DatabaseVersionKey,
13 IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
14};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::module::registry::ModuleDecoderRegistry;
17use fedimint_core::module::SupportedApiVersionsSummary;
18use fedimint_core::util::BoxFuture;
19use fedimint_core::{impl_db_lookup, impl_db_record, PeerId};
20use fedimint_logging::LOG_CLIENT_DB;
21use futures::StreamExt;
22use serde::{Deserialize, Serialize};
23use strum_macros::EnumIter;
24use tracing::{debug, info, trace, warn};
25
26use crate::backup::{ClientBackup, Metadata};
27use crate::module::recovery::RecoveryProgress;
28use crate::oplog::OperationLogEntry;
29use crate::sm::executor::{
30 ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKeyBytes,
31 InactiveStateKeyPrefixBytes,
32};
33use crate::sm::{ActiveStateMeta, InactiveStateMeta};
34
35#[repr(u8)]
36#[derive(Clone, EnumIter, Debug)]
37pub enum DbKeyPrefix {
38 EncodedClientSecret = 0x28,
39 ClientSecret = 0x29, ClientPreRootSecretHash = 0x2a,
41 OperationLog = 0x2c,
42 ChronologicalOperationLog = 0x2d,
43 CommonApiVersionCache = 0x2e,
44 ClientConfig = 0x2f,
45 ClientInviteCode = 0x30, ClientInitState = 0x31,
47 ClientMetadata = 0x32,
48 ClientLastBackup = 0x33,
49 ClientMetaField = 0x34,
50 ClientMetaServiceInfo = 0x35,
51 ApiSecret = 0x36,
52 PeerLastApiVersionsSummaryCache = 0x37,
53 ApiUrlAnnouncement = 0x38,
54 EventLog = 0x39,
55 UnorderedEventLog = 0x3a,
56
57 UserData = 0xb0,
67 ExternalReservedStart = 0xb1,
70 ExternalReservedEnd = 0xcf,
73 InternalReservedStart = 0xd0,
75 ModuleGlobalPrefix = 0xff,
77}
78
79impl std::fmt::Display for DbKeyPrefix {
80 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
81 write!(f, "{self:?}")
82 }
83}
84
85#[derive(Debug, Encodable, Decodable)]
86pub struct EncodedClientSecretKey;
87
88#[derive(Debug, Encodable, Decodable)]
89pub struct EncodedClientSecretKeyPrefix;
90
91impl_db_record!(
92 key = EncodedClientSecretKey,
93 value = Vec<u8>,
94 db_prefix = DbKeyPrefix::EncodedClientSecret,
95);
96impl_db_lookup!(
97 key = EncodedClientSecretKey,
98 query_prefix = EncodedClientSecretKeyPrefix
99);
100
101#[derive(Debug, Encodable, Decodable, Serialize)]
102pub struct OperationLogKey {
103 pub operation_id: OperationId,
104}
105
106impl_db_record!(
107 key = OperationLogKey,
108 value = OperationLogEntry,
109 db_prefix = DbKeyPrefix::OperationLog
110);
111
112#[derive(Debug, Encodable, Decodable, Serialize)]
113pub struct ClientPreRootSecretHashKey;
114
115impl_db_record!(
116 key = ClientPreRootSecretHashKey,
117 value = [u8; 8],
118 db_prefix = DbKeyPrefix::ClientPreRootSecretHash
119);
120
121#[derive(Debug, Clone, Copy, Encodable, Decodable, Serialize)]
123pub struct ChronologicalOperationLogKey {
124 pub creation_time: std::time::SystemTime,
125 pub operation_id: OperationId,
126}
127
128#[derive(Debug, Encodable)]
129pub struct ChronologicalOperationLogKeyPrefix;
130
131impl_db_record!(
132 key = ChronologicalOperationLogKey,
133 value = (),
134 db_prefix = DbKeyPrefix::ChronologicalOperationLog
135);
136
137impl_db_lookup!(
138 key = ChronologicalOperationLogKey,
139 query_prefix = ChronologicalOperationLogKeyPrefix
140);
141
142#[derive(Debug, Encodable, Decodable)]
143pub struct CachedApiVersionSetKey;
144
145#[derive(Debug, Encodable, Decodable)]
146pub struct CachedApiVersionSet(pub ApiVersionSet);
147
148impl_db_record!(
149 key = CachedApiVersionSetKey,
150 value = CachedApiVersionSet,
151 db_prefix = DbKeyPrefix::CommonApiVersionCache
152);
153
154#[derive(Debug, Encodable, Decodable)]
155pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
156
157#[derive(Debug, Encodable, Decodable)]
158pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
159
160impl_db_record!(
161 key = PeerLastApiVersionsSummaryKey,
162 value = PeerLastApiVersionsSummary,
163 db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
164);
165
166#[derive(Debug, Encodable, Decodable, Serialize)]
167pub struct ClientConfigKey;
168
169impl_db_record!(
170 key = ClientConfigKey,
171 value = ClientConfig,
172 db_prefix = DbKeyPrefix::ClientConfig
173);
174
175#[derive(Debug, Encodable, Decodable, Serialize)]
176pub struct ClientConfigKeyV0 {
177 pub id: FederationId,
178}
179
180#[derive(Debug, Encodable)]
181pub struct ClientConfigKeyPrefixV0;
182
183impl_db_record!(
184 key = ClientConfigKeyV0,
185 value = ClientConfigV0,
186 db_prefix = DbKeyPrefix::ClientConfig
187);
188
189impl_db_lookup!(
190 key = ClientConfigKeyV0,
191 query_prefix = ClientConfigKeyPrefixV0
192);
193
194#[derive(Debug, Encodable, Decodable, Serialize)]
195pub struct ApiSecretKey;
196
197#[derive(Debug, Encodable)]
198pub struct ApiSecretKeyPrefix;
199
200impl_db_record!(
201 key = ApiSecretKey,
202 value = String,
203 db_prefix = DbKeyPrefix::ApiSecret
204);
205
206impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
207
208#[derive(Debug, Encodable, Decodable, Serialize)]
210pub struct ClientMetadataKey;
211
212#[derive(Debug, Encodable)]
213pub struct ClientMetadataPrefix;
214
215impl_db_record!(
216 key = ClientMetadataKey,
217 value = Metadata,
218 db_prefix = DbKeyPrefix::ClientMetadata
219);
220
221impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
222
223#[derive(Debug, Encodable, Decodable, Serialize)]
225pub struct ClientInitStateKey;
226
227#[derive(Debug, Encodable)]
228pub struct ClientInitStatePrefix;
229
230#[derive(Debug, Encodable, Decodable)]
232pub enum InitMode {
233 Fresh,
235 Recover { snapshot: Option<ClientBackup> },
238}
239
240#[derive(Debug, Encodable, Decodable)]
246pub enum InitModeComplete {
247 Fresh,
248 Recover,
249}
250
251#[derive(Debug, Encodable, Decodable)]
253pub enum InitState {
254 Pending(InitMode),
257 Complete(InitModeComplete),
259}
260
261impl InitState {
262 pub fn into_complete(self) -> Self {
263 match self {
264 InitState::Pending(p) => InitState::Complete(match p {
265 InitMode::Fresh => InitModeComplete::Fresh,
266 InitMode::Recover { .. } => InitModeComplete::Recover,
267 }),
268 InitState::Complete(t) => InitState::Complete(t),
269 }
270 }
271
272 pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
273 match self {
274 InitState::Pending(p) => match p {
275 InitMode::Fresh => None,
276 InitMode::Recover { snapshot } => Some(snapshot.clone()),
277 },
278 InitState::Complete(_) => None,
279 }
280 }
281
282 pub fn is_pending(&self) -> bool {
283 match self {
284 InitState::Pending(_) => true,
285 InitState::Complete(_) => false,
286 }
287 }
288}
289
290impl_db_record!(
291 key = ClientInitStateKey,
292 value = InitState,
293 db_prefix = DbKeyPrefix::ClientInitState
294);
295
296impl_db_lookup!(
297 key = ClientInitStateKey,
298 query_prefix = ClientInitStatePrefix
299);
300
301#[derive(Debug, Encodable, Decodable, Serialize)]
302pub struct ClientRecoverySnapshot;
303
304#[derive(Debug, Encodable, Decodable, Serialize)]
305pub struct ClientRecoverySnapshotPrefix;
306
307impl_db_record!(
308 key = ClientRecoverySnapshot,
309 value = Option<ClientBackup>,
310 db_prefix = DbKeyPrefix::ClientInitState
311);
312
313impl_db_lookup!(
314 key = ClientRecoverySnapshot,
315 query_prefix = ClientRecoverySnapshotPrefix
316);
317
318#[derive(Debug, Encodable, Decodable, Serialize)]
319pub struct ClientModuleRecovery {
320 pub module_instance_id: ModuleInstanceId,
321}
322
323#[derive(Debug, Encodable)]
324pub struct ClientModuleRecoveryPrefix;
325
326#[derive(Debug, Clone, Encodable, Decodable)]
327pub struct ClientModuleRecoveryState {
328 pub progress: RecoveryProgress,
329}
330
331impl ClientModuleRecoveryState {
332 pub fn is_done(&self) -> bool {
333 self.progress.is_done()
334 }
335}
336
337impl_db_record!(
338 key = ClientModuleRecovery,
339 value = ClientModuleRecoveryState,
340 db_prefix = DbKeyPrefix::ClientInitState,
341);
342
343impl_db_lookup!(
344 key = ClientModuleRecovery,
345 query_prefix = ClientModuleRecoveryPrefix
346);
347
348#[derive(Debug, Encodable, Decodable)]
353pub struct LastBackupKey;
354
355impl_db_record!(
356 key = LastBackupKey,
357 value = ClientBackup,
358 db_prefix = DbKeyPrefix::ClientLastBackup
359);
360
361#[derive(
362 Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
363)]
364pub struct MetaFieldKey(pub String);
365
366#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
367pub struct MetaFieldPrefix;
368
369#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
370pub struct MetaFieldValue(pub String);
371
372#[derive(Encodable, Decodable, Debug)]
373pub struct MetaServiceInfoKey;
374
375#[derive(Encodable, Decodable, Debug)]
376pub struct MetaServiceInfo {
377 pub last_updated: SystemTime,
378 pub revision: u64,
379}
380
381impl_db_record!(
382 key = MetaFieldKey,
383 value = MetaFieldValue,
384 db_prefix = DbKeyPrefix::ClientMetaField
385);
386
387impl_db_record!(
388 key = MetaServiceInfoKey,
389 value = MetaServiceInfo,
390 db_prefix = DbKeyPrefix::ClientMetaServiceInfo
391);
392
393impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
394
395pub type ClientMigrationFn = for<'r, 'tx> fn(
398 &'r mut DatabaseTransaction<'tx>,
399 Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>, ) -> BoxFuture<
402 'r,
403 anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>>,
404>;
405
406pub fn get_core_client_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
407 let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
408 migrations.insert(DatabaseVersion(0), |mut ctx| {
409 Box::pin(async move {
410 let mut dbtx = ctx.dbtx();
411
412 let config_v0 = dbtx
413 .find_by_prefix(&ClientConfigKeyPrefixV0)
414 .await
415 .collect::<Vec<_>>()
416 .await;
417
418 assert!(config_v0.len() <= 1);
419 let Some((id, config_v0)) = config_v0.into_iter().next() else {
420 return Ok(());
421 };
422
423 let global = GlobalClientConfig {
424 api_endpoints: config_v0.global.api_endpoints,
425 broadcast_public_keys: None,
426 consensus_version: config_v0.global.consensus_version,
427 meta: config_v0.global.meta,
428 };
429
430 let config = ClientConfig {
431 global,
432 modules: config_v0.modules,
433 };
434
435 dbtx.remove_entry(&id).await;
436 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
437 Ok(())
438 })
439 });
440
441 migrations
442}
443
444pub async fn apply_migrations_core_client(
445 db: &Database,
446 kind: String,
447 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
448) -> Result<(), anyhow::Error> {
449 apply_migrations(
450 db,
451 kind,
452 migrations,
453 None,
454 Some(DbKeyPrefix::UserData as u8),
455 )
456 .await
457}
458
459pub async fn apply_migrations_client(
469 db: &Database,
470 kind: String,
471 migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>,
472 module_instance_id: ModuleInstanceId,
473) -> Result<(), anyhow::Error> {
474 let mut dbtx = db.begin_transaction_nc().await;
477 let is_new_db = dbtx
478 .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
479 .await?
480 .next()
481 .await
482 .is_none();
483
484 let target_version = get_current_database_version(&migrations);
485
486 create_database_version(
488 db,
489 target_version,
490 Some(module_instance_id),
491 kind.clone(),
492 is_new_db,
493 )
494 .await?;
495
496 let mut global_dbtx = db.begin_transaction().await;
497 let current_version = global_dbtx
498 .get_value(&DatabaseVersionKey(module_instance_id))
499 .await;
500
501 let db_version = if let Some(mut current_version) = current_version {
502 if current_version == target_version {
503 trace!(
504 target: LOG_CLIENT_DB,
505 %current_version,
506 %target_version,
507 module_instance_id,
508 kind,
509 "Database version up to date"
510 );
511 global_dbtx.ignore_uncommitted();
512 return Ok(());
513 }
514
515 if target_version < current_version {
516 return Err(anyhow::anyhow!(format!(
517 "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
518 current_version,
519 target_version,
520 )));
521 }
522
523 info!(
524 target: LOG_CLIENT_DB,
525 %current_version,
526 %target_version,
527 module_instance_id,
528 kind,
529 "Migrating client module database"
530 );
531 let mut active_states =
532 get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
533 let mut inactive_states =
534 get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
535
536 while current_version < target_version {
537 let new_states = if let Some(migration) = migrations.get(¤t_version) {
538 debug!(
539 target: LOG_CLIENT_DB,
540 module_instance_id,
541 %kind,
542 %current_version,
543 %target_version,
544 "Running module db migration");
545
546 migration(
547 &mut global_dbtx
548 .to_ref_with_prefix_module_id(module_instance_id)
549 .0
550 .into_nc(),
551 active_states.clone(),
552 inactive_states.clone(),
553 )
554 .await?
555 } else {
556 warn!(
557 target: LOG_CLIENT_DB,
558 ?current_version, "Missing client db migration");
559 None
560 };
561
562 if let Some((new_active_states, new_inactive_states)) = new_states {
565 remove_old_and_persist_new_active_states(
566 &mut global_dbtx.to_ref_nc(),
567 new_active_states.clone(),
568 active_states.clone(),
569 module_instance_id,
570 )
571 .await;
572 remove_old_and_persist_new_inactive_states(
573 &mut global_dbtx.to_ref_nc(),
574 new_inactive_states.clone(),
575 inactive_states.clone(),
576 module_instance_id,
577 )
578 .await;
579
580 active_states = new_active_states;
582 inactive_states = new_inactive_states;
583 }
584
585 current_version = current_version.increment();
586 global_dbtx
587 .insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_version)
588 .await;
589 }
590
591 current_version
592 } else {
593 target_version
594 };
595
596 global_dbtx.commit_tx_result().await?;
597 debug!(
598 target: LOG_CLIENT_DB,
599 ?kind, ?db_version, "Client DB Version");
600 Ok(())
601}
602
603pub async fn get_active_states(
609 dbtx: &mut DatabaseTransaction<'_>,
610 module_instance_id: ModuleInstanceId,
611) -> Vec<(Vec<u8>, OperationId)> {
612 dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
613 .await
614 .filter_map(|(state, _)| async move {
615 if module_instance_id == state.module_instance_id {
616 Some((state.state, state.operation_id))
617 } else {
618 None
619 }
620 })
621 .collect::<Vec<_>>()
622 .await
623}
624
625pub async fn get_inactive_states(
631 dbtx: &mut DatabaseTransaction<'_>,
632 module_instance_id: ModuleInstanceId,
633) -> Vec<(Vec<u8>, OperationId)> {
634 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
635 .await
636 .filter_map(|(state, _)| async move {
637 if module_instance_id == state.module_instance_id {
638 Some((state.state, state.operation_id))
639 } else {
640 None
641 }
642 })
643 .collect::<Vec<_>>()
644 .await
645}
646
647pub async fn remove_old_and_persist_new_active_states(
651 dbtx: &mut DatabaseTransaction<'_>,
652 new_active_states: Vec<(Vec<u8>, OperationId)>,
653 states_to_remove: Vec<(Vec<u8>, OperationId)>,
654 module_instance_id: ModuleInstanceId,
655) {
656 for (bytes, operation_id) in states_to_remove {
658 dbtx.remove_entry(&ActiveStateKeyBytes {
659 operation_id,
660 module_instance_id,
661 state: bytes,
662 })
663 .await
664 .expect("Did not delete anything");
665 }
666
667 for (bytes, operation_id) in new_active_states {
669 dbtx.insert_new_entry(
670 &ActiveStateKeyBytes {
671 operation_id,
672 module_instance_id,
673 state: bytes,
674 },
675 &ActiveStateMeta::default(),
676 )
677 .await;
678 }
679}
680
681pub async fn remove_old_and_persist_new_inactive_states(
685 dbtx: &mut DatabaseTransaction<'_>,
686 new_inactive_states: Vec<(Vec<u8>, OperationId)>,
687 states_to_remove: Vec<(Vec<u8>, OperationId)>,
688 module_instance_id: ModuleInstanceId,
689) {
690 for (bytes, operation_id) in states_to_remove {
692 dbtx.remove_entry(&InactiveStateKeyBytes {
693 operation_id,
694 module_instance_id,
695 state: bytes,
696 })
697 .await
698 .expect("Did not delete anything");
699 }
700
701 for (bytes, operation_id) in new_inactive_states {
703 dbtx.insert_new_entry(
704 &InactiveStateKeyBytes {
705 operation_id,
706 module_instance_id,
707 state: bytes,
708 },
709 &InactiveStateMeta {
710 created_at: fedimint_core::time::now(),
711 exited_at: fedimint_core::time::now(),
712 },
713 )
714 .await;
715 }
716}
717
718type MigrateStateFn =
720 fn(OperationId, &mut Cursor<&[u8]>) -> anyhow::Result<Option<(Vec<u8>, OperationId)>>;
721
722pub fn migrate_state(
726 active_states: Vec<(Vec<u8>, OperationId)>,
727 inactive_states: Vec<(Vec<u8>, OperationId)>,
728 migrate: MigrateStateFn,
729) -> anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>> {
730 let mut new_active_states = Vec::with_capacity(active_states.len());
731 for (active_state, operation_id) in active_states {
732 let bytes = active_state.as_slice();
733
734 let decoders = ModuleDecoderRegistry::default();
735 let mut cursor = std::io::Cursor::new(bytes);
736 let module_instance_id =
737 fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?;
738
739 let state = match migrate(operation_id, &mut cursor)? {
740 Some((mut state, operation_id)) => {
741 let mut final_state = module_instance_id.to_bytes();
742 final_state.append(&mut state);
743 (final_state, operation_id)
744 }
745 None => (active_state, operation_id),
746 };
747
748 new_active_states.push(state);
749 }
750
751 let mut new_inactive_states = Vec::with_capacity(inactive_states.len());
752 for (inactive_state, operation_id) in inactive_states {
753 let bytes = inactive_state.as_slice();
754
755 let decoders = ModuleDecoderRegistry::default();
756 let mut cursor = std::io::Cursor::new(bytes);
757 let module_instance_id =
758 fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?;
759
760 let state = match migrate(operation_id, &mut cursor)? {
761 Some((mut state, operation_id)) => {
762 let mut final_state = module_instance_id.to_bytes();
763 final_state.append(&mut state);
764 (final_state, operation_id)
765 }
766 None => (inactive_state, operation_id),
767 };
768
769 new_inactive_states.push(state);
770 }
771
772 Ok(Some((new_active_states, new_inactive_states)))
773}