fedimint_client/
db.rs

1use std::collections::BTreeMap;
2use std::io::Cursor;
3use std::time::SystemTime;
4
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
8use fedimint_core::core::{ModuleInstanceId, OperationId};
9use fedimint_core::db::{
10    apply_migrations, create_database_version, get_current_database_version, CoreMigrationFn,
11    Database, DatabaseTransaction, DatabaseValue, DatabaseVersion, DatabaseVersionKey,
12    IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::module::registry::ModuleDecoderRegistry;
16use fedimint_core::module::SupportedApiVersionsSummary;
17use fedimint_core::util::BoxFuture;
18use fedimint_core::{impl_db_lookup, impl_db_record, PeerId};
19use fedimint_eventlog::{
20    EventLogId, UnordedEventLogId, DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
21};
22use fedimint_logging::LOG_CLIENT_DB;
23use futures::StreamExt;
24use serde::{Deserialize, Serialize};
25use strum_macros::EnumIter;
26use tracing::{debug, info, trace, warn};
27
28use crate::backup::{ClientBackup, Metadata};
29use crate::module::recovery::RecoveryProgress;
30use crate::oplog::{OperationLogEntry, OperationLogEntryV0, OperationOutcome};
31use crate::sm::executor::{
32    ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKeyBytes,
33    InactiveStateKeyPrefixBytes,
34};
35use crate::sm::{ActiveStateMeta, InactiveStateMeta};
36
37#[repr(u8)]
38#[derive(Clone, EnumIter, Debug)]
39pub enum DbKeyPrefix {
40    EncodedClientSecret = 0x28,
41    ClientSecret = 0x29, // Unused
42    ClientPreRootSecretHash = 0x2a,
43    OperationLog = 0x2c,
44    ChronologicalOperationLog = 0x2d,
45    CommonApiVersionCache = 0x2e,
46    ClientConfig = 0x2f,
47    ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
48    ClientInitState = 0x31,
49    ClientMetadata = 0x32,
50    ClientLastBackup = 0x33,
51    ClientMetaField = 0x34,
52    ClientMetaServiceInfo = 0x35,
53    ApiSecret = 0x36,
54    PeerLastApiVersionsSummaryCache = 0x37,
55    ApiUrlAnnouncement = 0x38,
56    EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
57    UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
58
59    /// Arbitrary data of the applications integrating Fedimint client and
60    /// wanting to store some Federation-specific data in Fedimint client
61    /// database.
62    ///
63    /// New users are encouraged to use this single prefix only.
64    //
65    // TODO: https://github.com/fedimint/fedimint/issues/4444
66    //       in the future, we should make all global access to the db private
67    //       and only expose a getter returning isolated database.
68    UserData = 0xb0,
69    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
70    /// historical and future external use
71    ExternalReservedStart = 0xb1,
72    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
73    /// historical and future external use
74    ExternalReservedEnd = 0xcf,
75    /// 0xd0.. reserved for Fedimint internal use
76    InternalReservedStart = 0xd0,
77    /// Per-module instance data
78    ModuleGlobalPrefix = 0xff,
79}
80
81impl std::fmt::Display for DbKeyPrefix {
82    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83        write!(f, "{self:?}")
84    }
85}
86
87#[derive(Debug, Encodable, Decodable)]
88pub struct EncodedClientSecretKey;
89
90#[derive(Debug, Encodable, Decodable)]
91pub struct EncodedClientSecretKeyPrefix;
92
93impl_db_record!(
94    key = EncodedClientSecretKey,
95    value = Vec<u8>,
96    db_prefix = DbKeyPrefix::EncodedClientSecret,
97);
98impl_db_lookup!(
99    key = EncodedClientSecretKey,
100    query_prefix = EncodedClientSecretKeyPrefix
101);
102
103#[derive(Debug, Encodable, Decodable, Serialize)]
104pub struct OperationLogKey {
105    pub operation_id: OperationId,
106}
107
108#[derive(Debug, Encodable)]
109pub struct OperationLogKeyPrefix;
110
111impl_db_record!(
112    key = OperationLogKey,
113    value = OperationLogEntry,
114    db_prefix = DbKeyPrefix::OperationLog
115);
116
117impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
118
119#[derive(Debug, Encodable, Decodable, Serialize)]
120pub struct OperationLogKeyV0 {
121    pub operation_id: OperationId,
122}
123
124#[derive(Debug, Encodable)]
125pub struct OperationLogKeyPrefixV0;
126
127impl_db_record!(
128    key = OperationLogKeyV0,
129    value = OperationLogEntryV0,
130    db_prefix = DbKeyPrefix::OperationLog
131);
132
133impl_db_lookup!(
134    key = OperationLogKeyV0,
135    query_prefix = OperationLogKeyPrefixV0
136);
137
138#[derive(Debug, Encodable, Decodable, Serialize)]
139pub struct ClientPreRootSecretHashKey;
140
141impl_db_record!(
142    key = ClientPreRootSecretHashKey,
143    value = [u8; 8],
144    db_prefix = DbKeyPrefix::ClientPreRootSecretHash
145);
146
147/// Key used to lookup operation log entries in chronological order
148#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize)]
149pub struct ChronologicalOperationLogKey {
150    pub creation_time: std::time::SystemTime,
151    pub operation_id: OperationId,
152}
153
154#[derive(Debug, Encodable)]
155pub struct ChronologicalOperationLogKeyPrefix;
156
157impl_db_record!(
158    key = ChronologicalOperationLogKey,
159    value = (),
160    db_prefix = DbKeyPrefix::ChronologicalOperationLog
161);
162
163impl_db_lookup!(
164    key = ChronologicalOperationLogKey,
165    query_prefix = ChronologicalOperationLogKeyPrefix
166);
167
168#[derive(Debug, Encodable, Decodable)]
169pub struct CachedApiVersionSetKey;
170
171#[derive(Debug, Encodable, Decodable)]
172pub struct CachedApiVersionSet(pub ApiVersionSet);
173
174impl_db_record!(
175    key = CachedApiVersionSetKey,
176    value = CachedApiVersionSet,
177    db_prefix = DbKeyPrefix::CommonApiVersionCache
178);
179
180#[derive(Debug, Encodable, Decodable)]
181pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
182
183#[derive(Debug, Encodable, Decodable)]
184pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
185
186impl_db_record!(
187    key = PeerLastApiVersionsSummaryKey,
188    value = PeerLastApiVersionsSummary,
189    db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
190);
191
192#[derive(Debug, Encodable, Decodable, Serialize)]
193pub struct ClientConfigKey;
194
195impl_db_record!(
196    key = ClientConfigKey,
197    value = ClientConfig,
198    db_prefix = DbKeyPrefix::ClientConfig
199);
200
201#[derive(Debug, Encodable, Decodable, Serialize)]
202pub struct ClientConfigKeyV0 {
203    pub id: FederationId,
204}
205
206#[derive(Debug, Encodable)]
207pub struct ClientConfigKeyPrefixV0;
208
209impl_db_record!(
210    key = ClientConfigKeyV0,
211    value = ClientConfigV0,
212    db_prefix = DbKeyPrefix::ClientConfig
213);
214
215impl_db_lookup!(
216    key = ClientConfigKeyV0,
217    query_prefix = ClientConfigKeyPrefixV0
218);
219
220#[derive(Debug, Encodable, Decodable, Serialize)]
221pub struct ApiSecretKey;
222
223#[derive(Debug, Encodable)]
224pub struct ApiSecretKeyPrefix;
225
226impl_db_record!(
227    key = ApiSecretKey,
228    value = String,
229    db_prefix = DbKeyPrefix::ApiSecret
230);
231
232impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
233
234/// Client metadata that will be stored/restored on backup&recovery
235#[derive(Debug, Encodable, Decodable, Serialize)]
236pub struct ClientMetadataKey;
237
238#[derive(Debug, Encodable)]
239pub struct ClientMetadataPrefix;
240
241impl_db_record!(
242    key = ClientMetadataKey,
243    value = Metadata,
244    db_prefix = DbKeyPrefix::ClientMetadata
245);
246
247impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
248
249/// Does the client modules need to run recovery before being usable?
250#[derive(Debug, Encodable, Decodable, Serialize)]
251pub struct ClientInitStateKey;
252
253#[derive(Debug, Encodable)]
254pub struct ClientInitStatePrefix;
255
256/// Client initialization mode
257#[derive(Debug, Encodable, Decodable)]
258pub enum InitMode {
259    /// Should only be used with freshly generated root secret
260    Fresh,
261    /// Should be used with root secrets provided by the user to recover a
262    /// (even if just possibly) already used secret.
263    Recover { snapshot: Option<ClientBackup> },
264}
265
266/// Like `InitMode`, but without no longer required data.
267///
268/// This is distinct from `InitMode` to prevent holding on to `snapshot`
269/// forever both for user's privacy and space use. In case user get hacked
270/// or phone gets stolen.
271#[derive(Debug, Encodable, Decodable)]
272pub enum InitModeComplete {
273    Fresh,
274    Recover,
275}
276
277/// The state of the client initialization
278#[derive(Debug, Encodable, Decodable)]
279pub enum InitState {
280    /// Client data initialization might still require some work (e.g. client
281    /// recovery)
282    Pending(InitMode),
283    /// Client initialization was complete
284    Complete(InitModeComplete),
285}
286
287impl InitState {
288    pub fn into_complete(self) -> Self {
289        match self {
290            InitState::Pending(p) => InitState::Complete(match p {
291                InitMode::Fresh => InitModeComplete::Fresh,
292                InitMode::Recover { .. } => InitModeComplete::Recover,
293            }),
294            InitState::Complete(t) => InitState::Complete(t),
295        }
296    }
297
298    pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
299        match self {
300            InitState::Pending(p) => match p {
301                InitMode::Fresh => None,
302                InitMode::Recover { snapshot } => Some(snapshot.clone()),
303            },
304            InitState::Complete(_) => None,
305        }
306    }
307
308    pub fn is_pending(&self) -> bool {
309        match self {
310            InitState::Pending(_) => true,
311            InitState::Complete(_) => false,
312        }
313    }
314}
315
316impl_db_record!(
317    key = ClientInitStateKey,
318    value = InitState,
319    db_prefix = DbKeyPrefix::ClientInitState
320);
321
322impl_db_lookup!(
323    key = ClientInitStateKey,
324    query_prefix = ClientInitStatePrefix
325);
326
327#[derive(Debug, Encodable, Decodable, Serialize)]
328pub struct ClientRecoverySnapshot;
329
330#[derive(Debug, Encodable, Decodable, Serialize)]
331pub struct ClientRecoverySnapshotPrefix;
332
333impl_db_record!(
334    key = ClientRecoverySnapshot,
335    value = Option<ClientBackup>,
336    db_prefix = DbKeyPrefix::ClientInitState
337);
338
339impl_db_lookup!(
340    key = ClientRecoverySnapshot,
341    query_prefix = ClientRecoverySnapshotPrefix
342);
343
344#[derive(Debug, Encodable, Decodable, Serialize)]
345pub struct ClientModuleRecovery {
346    pub module_instance_id: ModuleInstanceId,
347}
348
349#[derive(Debug, Encodable)]
350pub struct ClientModuleRecoveryPrefix;
351
352#[derive(Debug, Clone, Encodable, Decodable)]
353pub struct ClientModuleRecoveryState {
354    pub progress: RecoveryProgress,
355}
356
357impl ClientModuleRecoveryState {
358    pub fn is_done(&self) -> bool {
359        self.progress.is_done()
360    }
361}
362
363impl_db_record!(
364    key = ClientModuleRecovery,
365    value = ClientModuleRecoveryState,
366    db_prefix = DbKeyPrefix::ClientInitState,
367);
368
369impl_db_lookup!(
370    key = ClientModuleRecovery,
371    query_prefix = ClientModuleRecoveryPrefix
372);
373
374/// Last valid backup the client attempted to make
375///
376/// Can be used to find previous valid versions of
377/// module backup.
378#[derive(Debug, Encodable, Decodable)]
379pub struct LastBackupKey;
380
381impl_db_record!(
382    key = LastBackupKey,
383    value = ClientBackup,
384    db_prefix = DbKeyPrefix::ClientLastBackup
385);
386
387#[derive(
388    Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
389)]
390pub struct MetaFieldKey(pub String);
391
392#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
393pub struct MetaFieldPrefix;
394
395#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
396pub struct MetaFieldValue(pub String);
397
398#[derive(Encodable, Decodable, Debug)]
399pub struct MetaServiceInfoKey;
400
401#[derive(Encodable, Decodable, Debug)]
402pub struct MetaServiceInfo {
403    pub last_updated: SystemTime,
404    pub revision: u64,
405}
406
407impl_db_record!(
408    key = MetaFieldKey,
409    value = MetaFieldValue,
410    db_prefix = DbKeyPrefix::ClientMetaField
411);
412
413impl_db_record!(
414    key = MetaServiceInfoKey,
415    value = MetaServiceInfo,
416    db_prefix = DbKeyPrefix::ClientMetaServiceInfo
417);
418
419impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
420
421/// `ClientMigrationFn` is a function that modules can implement to "migrate"
422/// the database to the next database version.
423pub type ClientMigrationFn = for<'r, 'tx> fn(
424    &'r mut DatabaseTransaction<'tx>,
425    Vec<(Vec<u8>, OperationId)>, // active states
426    Vec<(Vec<u8>, OperationId)>, // inactive states
427) -> BoxFuture<
428    'r,
429    anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>>,
430>;
431
432pub fn get_core_client_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
433    let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
434    migrations.insert(DatabaseVersion(0), |mut ctx| {
435        Box::pin(async move {
436            let mut dbtx = ctx.dbtx();
437
438            let config_v0 = dbtx
439                .find_by_prefix(&ClientConfigKeyPrefixV0)
440                .await
441                .collect::<Vec<_>>()
442                .await;
443
444            assert!(config_v0.len() <= 1);
445            let Some((id, config_v0)) = config_v0.into_iter().next() else {
446                return Ok(());
447            };
448
449            let global = GlobalClientConfig {
450                api_endpoints: config_v0.global.api_endpoints,
451                broadcast_public_keys: None,
452                consensus_version: config_v0.global.consensus_version,
453                meta: config_v0.global.meta,
454            };
455
456            let config = ClientConfig {
457                global,
458                modules: config_v0.modules,
459            };
460
461            dbtx.remove_entry(&id).await;
462            dbtx.insert_new_entry(&ClientConfigKey, &config).await;
463            Ok(())
464        })
465    });
466
467    // Migration to add outcome_time to OperationLogEntry
468    migrations.insert(DatabaseVersion(1), |mut ctx| {
469        Box::pin(async move {
470            let mut dbtx = ctx.dbtx();
471
472            // Read all OperationLogEntries using V0 format
473            let operation_logs = dbtx
474                .find_by_prefix(&OperationLogKeyPrefixV0)
475                .await
476                .collect::<Vec<_>>()
477                .await;
478
479            // Build a map from operation_id -> max_time of inactive state
480            let mut op_id_max_time = BTreeMap::new();
481
482            // Process inactive states
483            {
484                let mut inactive_states_stream =
485                    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
486
487                while let Some((state, meta)) = inactive_states_stream.next().await {
488                    let entry = op_id_max_time
489                        .entry(state.operation_id)
490                        .or_insert(meta.exited_at);
491                    *entry = (*entry).max(meta.exited_at);
492                }
493            }
494            // Migrate each V0 operation log entry to the new format
495            for (op_key_v0, log_entry_v0) in operation_logs {
496                let new_entry = OperationLogEntry {
497                    operation_module_kind: log_entry_v0.operation_module_kind,
498                    meta: log_entry_v0.meta,
499                    outcome: log_entry_v0.outcome.map(|outcome| {
500                        OperationOutcome {
501                            outcome,
502                            // If we found state times, use the max, otherwise use
503                            // current time
504                            time: op_id_max_time
505                                .get(&op_key_v0.operation_id)
506                                .copied()
507                                .unwrap_or_else(fedimint_core::time::now),
508                        }
509                    }),
510                };
511
512                dbtx.remove_entry(&op_key_v0).await;
513                dbtx.insert_entry(
514                    &OperationLogKey {
515                        operation_id: op_key_v0.operation_id,
516                    },
517                    &new_entry,
518                )
519                .await;
520            }
521
522            Ok(())
523        })
524    });
525
526    // Fix #6948
527    migrations.insert(DatabaseVersion(2), |mut ctx| {
528        Box::pin(async move {
529            let mut dbtx = ctx.dbtx();
530
531            // Migrate unordered keys that got written to ordered table
532            {
533                let mut ordered_log_entries = dbtx
534                    .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
535                    .await
536                    .expect("DB operation failed");
537                let mut keys_to_migrate = vec![];
538                while let Some((k, _v)) = ordered_log_entries.next().await {
539                    trace!(target: LOG_CLIENT_DB,
540                        k=%k.as_hex(),
541                        "Checking ordered log key"
542                    );
543                    if EventLogId::consensus_decode_whole(
544                        &k[1..],
545                        &ModuleDecoderRegistry::default(),
546                    )
547                    .is_err()
548                    {
549                        assert!(UnordedEventLogId::consensus_decode_whole(
550                            &k[1..],
551                            &ModuleDecoderRegistry::default()
552                        )
553                        .is_ok());
554                        keys_to_migrate.push(k);
555                    }
556                }
557                drop(ordered_log_entries);
558                for mut key_to_migrate in keys_to_migrate {
559                    warn!(target: LOG_CLIENT_DB,
560                        k=%key_to_migrate.as_hex(),
561                        "Migrating unordered event log entry written to an ordered log"
562                    );
563                    let v = dbtx
564                        .raw_remove_entry(&key_to_migrate)
565                        .await
566                        .expect("DB operation failed")
567                        .expect("Was there a moment ago");
568                    assert_eq!(key_to_migrate[0], 0x39);
569                    key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
570                    assert_eq!(key_to_migrate[0], 0x3a);
571                    dbtx.raw_insert_bytes(&key_to_migrate, &v)
572                        .await
573                        .expect("DB operation failed");
574                }
575            }
576
577            // Migrate ordered keys that got written to unordered table
578            {
579                let mut unordered_log_entries = dbtx
580                    .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
581                    .await
582                    .expect("DB operation failed");
583                let mut keys_to_migrate = vec![];
584                while let Some((k, _v)) = unordered_log_entries.next().await {
585                    trace!(target: LOG_CLIENT_DB,
586                        k=%k.as_hex(),
587                        "Checking unordered log key"
588                    );
589                    if UnordedEventLogId::consensus_decode_whole(
590                        &k[1..],
591                        &ModuleDecoderRegistry::default(),
592                    )
593                    .is_err()
594                    {
595                        assert!(EventLogId::consensus_decode_whole(
596                            &k[1..],
597                            &ModuleDecoderRegistry::default()
598                        )
599                        .is_ok());
600                        keys_to_migrate.push(k);
601                    }
602                }
603                drop(unordered_log_entries);
604                for mut key_to_migrate in keys_to_migrate {
605                    warn!(target: LOG_CLIENT_DB,
606                        k=%key_to_migrate.as_hex(),
607                        "Migrating ordered event log entry written to an unordered log"
608                    );
609                    let v = dbtx
610                        .raw_remove_entry(&key_to_migrate)
611                        .await
612                        .expect("DB operation failed")
613                        .expect("Was there a moment ago");
614                    assert_eq!(key_to_migrate[0], 0x3a);
615                    key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
616                    assert_eq!(key_to_migrate[0], 0x39);
617                    dbtx.raw_insert_bytes(&key_to_migrate, &v)
618                        .await
619                        .expect("DB operation failed");
620                }
621            }
622            Ok(())
623        })
624    });
625    migrations
626}
627
628pub async fn apply_migrations_core_client(
629    db: &Database,
630    kind: String,
631    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
632) -> Result<(), anyhow::Error> {
633    apply_migrations(
634        db,
635        kind,
636        migrations,
637        None,
638        Some(DbKeyPrefix::UserData as u8),
639    )
640    .await
641}
642
643/// `apply_migrations_client` iterates from the on disk database version for the
644/// client module up to `target_db_version` and executes all of the migrations
645/// that exist in the migrations map, including state machine migrations.
646/// Each migration in the migrations map updates the database to have the
647/// correct on-disk data structures that the code is expecting. The entire
648/// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
649/// This function is called before the module is initialized and as long as the
650/// correct migrations are supplied in the migrations map, the module
651/// will be able to read and write from the database successfully.
652pub async fn apply_migrations_client(
653    db: &Database,
654    kind: String,
655    migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>,
656    module_instance_id: ModuleInstanceId,
657) -> Result<(), anyhow::Error> {
658    // Newly created databases will not have any data underneath the
659    // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
660    let mut dbtx = db.begin_transaction_nc().await;
661    let is_new_db = dbtx
662        .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
663        .await?
664        .next()
665        .await
666        .is_none();
667
668    let target_version = get_current_database_version(&migrations);
669
670    // First write the database version to disk if it does not exist.
671    create_database_version(
672        db,
673        target_version,
674        Some(module_instance_id),
675        kind.clone(),
676        is_new_db,
677    )
678    .await?;
679
680    let mut global_dbtx = db.begin_transaction().await;
681    let current_version = global_dbtx
682        .get_value(&DatabaseVersionKey(module_instance_id))
683        .await;
684
685    let db_version = if let Some(mut current_version) = current_version {
686        if current_version == target_version {
687            trace!(
688                target: LOG_CLIENT_DB,
689                %current_version,
690                %target_version,
691                module_instance_id,
692                kind,
693                "Database version up to date"
694            );
695            global_dbtx.ignore_uncommitted();
696            return Ok(());
697        }
698
699        if target_version < current_version {
700            return Err(anyhow::anyhow!(format!(
701                "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
702                current_version,
703                target_version,
704            )));
705        }
706
707        info!(
708            target: LOG_CLIENT_DB,
709            %current_version,
710            %target_version,
711            module_instance_id,
712            kind,
713            "Migrating client module database"
714        );
715        let mut active_states =
716            get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
717        let mut inactive_states =
718            get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
719
720        while current_version < target_version {
721            let new_states = if let Some(migration) = migrations.get(&current_version) {
722                debug!(
723                     target: LOG_CLIENT_DB,
724                     module_instance_id,
725                     %kind,
726                     %current_version,
727                     %target_version,
728                     "Running module db migration");
729
730                migration(
731                    &mut global_dbtx
732                        .to_ref_with_prefix_module_id(module_instance_id)
733                        .0
734                        .into_nc(),
735                    active_states.clone(),
736                    inactive_states.clone(),
737                )
738                .await?
739            } else {
740                warn!(
741                    target: LOG_CLIENT_DB,
742                    ?current_version, "Missing client db migration");
743                None
744            };
745
746            // If the client migration returned new states, a state machine migration has
747            // occurred, and the new states need to be persisted to the database.
748            if let Some((new_active_states, new_inactive_states)) = new_states {
749                remove_old_and_persist_new_active_states(
750                    &mut global_dbtx.to_ref_nc(),
751                    new_active_states.clone(),
752                    active_states.clone(),
753                    module_instance_id,
754                )
755                .await;
756                remove_old_and_persist_new_inactive_states(
757                    &mut global_dbtx.to_ref_nc(),
758                    new_inactive_states.clone(),
759                    inactive_states.clone(),
760                    module_instance_id,
761                )
762                .await;
763
764                // the new states become the old states for the next migration
765                active_states = new_active_states;
766                inactive_states = new_inactive_states;
767            }
768
769            current_version = current_version.increment();
770            global_dbtx
771                .insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
772                .await;
773        }
774
775        current_version
776    } else {
777        target_version
778    };
779
780    global_dbtx.commit_tx_result().await?;
781    debug!(
782        target: LOG_CLIENT_DB,
783        ?kind, ?db_version, "Client DB Version");
784    Ok(())
785}
786
787/// Reads all active states from the database and returns `Vec<DynState>`.
788/// TODO: It is unfortunate that we can't read states by the module's instance
789/// id so we are forced to return all active states. Once we do a db migration
790/// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
791/// only read the module's relevant states.
792pub async fn get_active_states(
793    dbtx: &mut DatabaseTransaction<'_>,
794    module_instance_id: ModuleInstanceId,
795) -> Vec<(Vec<u8>, OperationId)> {
796    dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
797        .await
798        .filter_map(|(state, _)| async move {
799            if module_instance_id == state.module_instance_id {
800                Some((state.state, state.operation_id))
801            } else {
802                None
803            }
804        })
805        .collect::<Vec<_>>()
806        .await
807}
808
809/// Reads all inactive states from the database and returns `Vec<DynState>`.
810/// TODO: It is unfortunate that we can't read states by the module's instance
811/// id so we are forced to return all inactive states. Once we do a db migration
812/// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
813/// only read the module's relevant states.
814pub async fn get_inactive_states(
815    dbtx: &mut DatabaseTransaction<'_>,
816    module_instance_id: ModuleInstanceId,
817) -> Vec<(Vec<u8>, OperationId)> {
818    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
819        .await
820        .filter_map(|(state, _)| async move {
821            if module_instance_id == state.module_instance_id {
822                Some((state.state, state.operation_id))
823            } else {
824                None
825            }
826        })
827        .collect::<Vec<_>>()
828        .await
829}
830
831/// Persists new active states by first removing all current active states, and
832/// re-writing with the new set of active states. `new_active_states` is
833/// expected to contain all active states, not just the newly created states.
834pub async fn remove_old_and_persist_new_active_states(
835    dbtx: &mut DatabaseTransaction<'_>,
836    new_active_states: Vec<(Vec<u8>, OperationId)>,
837    states_to_remove: Vec<(Vec<u8>, OperationId)>,
838    module_instance_id: ModuleInstanceId,
839) {
840    // Remove all existing active states
841    for (bytes, operation_id) in states_to_remove {
842        dbtx.remove_entry(&ActiveStateKeyBytes {
843            operation_id,
844            module_instance_id,
845            state: bytes,
846        })
847        .await
848        .expect("Did not delete anything");
849    }
850
851    // Insert new "migrated" active states
852    for (bytes, operation_id) in new_active_states {
853        dbtx.insert_new_entry(
854            &ActiveStateKeyBytes {
855                operation_id,
856                module_instance_id,
857                state: bytes,
858            },
859            &ActiveStateMeta::default(),
860        )
861        .await;
862    }
863}
864
865/// Persists new inactive states by first removing all current inactive states,
866/// and re-writing with the new set of inactive states. `new_inactive_states` is
867/// expected to contain all inactive states, not just the newly created states.
868pub async fn remove_old_and_persist_new_inactive_states(
869    dbtx: &mut DatabaseTransaction<'_>,
870    new_inactive_states: Vec<(Vec<u8>, OperationId)>,
871    states_to_remove: Vec<(Vec<u8>, OperationId)>,
872    module_instance_id: ModuleInstanceId,
873) {
874    // Remove all existing active states
875    for (bytes, operation_id) in states_to_remove {
876        dbtx.remove_entry(&InactiveStateKeyBytes {
877            operation_id,
878            module_instance_id,
879            state: bytes,
880        })
881        .await
882        .expect("Did not delete anything");
883    }
884
885    // Insert new "migrated" inactive states
886    for (bytes, operation_id) in new_inactive_states {
887        dbtx.insert_new_entry(
888            &InactiveStateKeyBytes {
889                operation_id,
890                module_instance_id,
891                state: bytes,
892            },
893            &InactiveStateMeta {
894                created_at: fedimint_core::time::now(),
895                exited_at: fedimint_core::time::now(),
896            },
897        )
898        .await;
899    }
900}
901
902/// Helper function definition for migrating a single state.
903type MigrateStateFn =
904    fn(OperationId, &mut Cursor<&[u8]>) -> anyhow::Result<Option<(Vec<u8>, OperationId)>>;
905
906/// Migrates a particular state by looping over all active and inactive states.
907/// If the `migrate` closure returns `None`, this state was not migrated and
908/// should be added to the new state machine vectors.
909pub fn migrate_state(
910    active_states: Vec<(Vec<u8>, OperationId)>,
911    inactive_states: Vec<(Vec<u8>, OperationId)>,
912    migrate: MigrateStateFn,
913) -> anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>> {
914    let mut new_active_states = Vec::with_capacity(active_states.len());
915    for (active_state, operation_id) in active_states {
916        let bytes = active_state.as_slice();
917
918        let decoders = ModuleDecoderRegistry::default();
919        let mut cursor = std::io::Cursor::new(bytes);
920        let module_instance_id = fedimint_core::core::ModuleInstanceId::consensus_decode_partial(
921            &mut cursor,
922            &decoders,
923        )?;
924
925        let state = match migrate(operation_id, &mut cursor)? {
926            Some((mut state, operation_id)) => {
927                let mut final_state = module_instance_id.to_bytes();
928                final_state.append(&mut state);
929                (final_state, operation_id)
930            }
931            None => (active_state, operation_id),
932        };
933
934        new_active_states.push(state);
935    }
936
937    let mut new_inactive_states = Vec::with_capacity(inactive_states.len());
938    for (inactive_state, operation_id) in inactive_states {
939        let bytes = inactive_state.as_slice();
940
941        let decoders = ModuleDecoderRegistry::default();
942        let mut cursor = std::io::Cursor::new(bytes);
943        let module_instance_id = fedimint_core::core::ModuleInstanceId::consensus_decode_partial(
944            &mut cursor,
945            &decoders,
946        )?;
947
948        let state = match migrate(operation_id, &mut cursor)? {
949            Some((mut state, operation_id)) => {
950                let mut final_state = module_instance_id.to_bytes();
951                final_state.append(&mut state);
952                (final_state, operation_id)
953            }
954            None => (inactive_state, operation_id),
955        };
956
957        new_inactive_states.push(state);
958    }
959
960    Ok(Some((new_active_states, new_inactive_states)))
961}