fedimint_client/
db.rs

1pub mod event_log;
2
3use std::collections::BTreeMap;
4use std::io::Cursor;
5use std::time::SystemTime;
6
7use fedimint_api_client::api::ApiVersionSet;
8use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
9use fedimint_core::core::{ModuleInstanceId, OperationId};
10use fedimint_core::db::{
11    apply_migrations, create_database_version, get_current_database_version, CoreMigrationFn,
12    Database, DatabaseTransaction, DatabaseValue, DatabaseVersion, DatabaseVersionKey,
13    IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
14};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::module::registry::ModuleDecoderRegistry;
17use fedimint_core::module::SupportedApiVersionsSummary;
18use fedimint_core::util::BoxFuture;
19use fedimint_core::{impl_db_lookup, impl_db_record, PeerId};
20use fedimint_logging::LOG_CLIENT_DB;
21use futures::StreamExt;
22use serde::{Deserialize, Serialize};
23use strum_macros::EnumIter;
24use tracing::{debug, info, trace, warn};
25
26use crate::backup::{ClientBackup, Metadata};
27use crate::module::recovery::RecoveryProgress;
28use crate::oplog::OperationLogEntry;
29use crate::sm::executor::{
30    ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKeyBytes,
31    InactiveStateKeyPrefixBytes,
32};
33use crate::sm::{ActiveStateMeta, InactiveStateMeta};
34
35#[repr(u8)]
36#[derive(Clone, EnumIter, Debug)]
37pub enum DbKeyPrefix {
38    EncodedClientSecret = 0x28,
39    ClientSecret = 0x29, // Unused
40    ClientPreRootSecretHash = 0x2a,
41    OperationLog = 0x2c,
42    ChronologicalOperationLog = 0x2d,
43    CommonApiVersionCache = 0x2e,
44    ClientConfig = 0x2f,
45    ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
46    ClientInitState = 0x31,
47    ClientMetadata = 0x32,
48    ClientLastBackup = 0x33,
49    ClientMetaField = 0x34,
50    ClientMetaServiceInfo = 0x35,
51    ApiSecret = 0x36,
52    PeerLastApiVersionsSummaryCache = 0x37,
53    ApiUrlAnnouncement = 0x38,
54    EventLog = 0x39,
55    UnorderedEventLog = 0x3a,
56
57    /// Arbitrary data of the applications integrating Fedimint client and
58    /// wanting to store some Federation-specific data in Fedimint client
59    /// database.
60    ///
61    /// New users are encouraged to use this single prefix only.
62    //
63    // TODO: https://github.com/fedimint/fedimint/issues/4444
64    //       in the future, we should make all global access to the db private
65    //       and only expose a getter returning isolated database.
66    UserData = 0xb0,
67    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
68    /// historical and future external use
69    ExternalReservedStart = 0xb1,
70    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
71    /// historical and future external use
72    ExternalReservedEnd = 0xcf,
73    /// 0xd0.. reserved for Fedimint internal use
74    InternalReservedStart = 0xd0,
75    /// Per-module instance data
76    ModuleGlobalPrefix = 0xff,
77}
78
79impl std::fmt::Display for DbKeyPrefix {
80    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
81        write!(f, "{self:?}")
82    }
83}
84
85#[derive(Debug, Encodable, Decodable)]
86pub struct EncodedClientSecretKey;
87
88#[derive(Debug, Encodable, Decodable)]
89pub struct EncodedClientSecretKeyPrefix;
90
91impl_db_record!(
92    key = EncodedClientSecretKey,
93    value = Vec<u8>,
94    db_prefix = DbKeyPrefix::EncodedClientSecret,
95);
96impl_db_lookup!(
97    key = EncodedClientSecretKey,
98    query_prefix = EncodedClientSecretKeyPrefix
99);
100
101#[derive(Debug, Encodable, Decodable, Serialize)]
102pub struct OperationLogKey {
103    pub operation_id: OperationId,
104}
105
106impl_db_record!(
107    key = OperationLogKey,
108    value = OperationLogEntry,
109    db_prefix = DbKeyPrefix::OperationLog
110);
111
112#[derive(Debug, Encodable, Decodable, Serialize)]
113pub struct ClientPreRootSecretHashKey;
114
115impl_db_record!(
116    key = ClientPreRootSecretHashKey,
117    value = [u8; 8],
118    db_prefix = DbKeyPrefix::ClientPreRootSecretHash
119);
120
121/// Key used to lookup operation log entries in chronological order
122#[derive(Debug, Clone, Copy, Encodable, Decodable, Serialize)]
123pub struct ChronologicalOperationLogKey {
124    pub creation_time: std::time::SystemTime,
125    pub operation_id: OperationId,
126}
127
128#[derive(Debug, Encodable)]
129pub struct ChronologicalOperationLogKeyPrefix;
130
131impl_db_record!(
132    key = ChronologicalOperationLogKey,
133    value = (),
134    db_prefix = DbKeyPrefix::ChronologicalOperationLog
135);
136
137impl_db_lookup!(
138    key = ChronologicalOperationLogKey,
139    query_prefix = ChronologicalOperationLogKeyPrefix
140);
141
142#[derive(Debug, Encodable, Decodable)]
143pub struct CachedApiVersionSetKey;
144
145#[derive(Debug, Encodable, Decodable)]
146pub struct CachedApiVersionSet(pub ApiVersionSet);
147
148impl_db_record!(
149    key = CachedApiVersionSetKey,
150    value = CachedApiVersionSet,
151    db_prefix = DbKeyPrefix::CommonApiVersionCache
152);
153
154#[derive(Debug, Encodable, Decodable)]
155pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
156
157#[derive(Debug, Encodable, Decodable)]
158pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
159
160impl_db_record!(
161    key = PeerLastApiVersionsSummaryKey,
162    value = PeerLastApiVersionsSummary,
163    db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
164);
165
166#[derive(Debug, Encodable, Decodable, Serialize)]
167pub struct ClientConfigKey;
168
169impl_db_record!(
170    key = ClientConfigKey,
171    value = ClientConfig,
172    db_prefix = DbKeyPrefix::ClientConfig
173);
174
175#[derive(Debug, Encodable, Decodable, Serialize)]
176pub struct ClientConfigKeyV0 {
177    pub id: FederationId,
178}
179
180#[derive(Debug, Encodable)]
181pub struct ClientConfigKeyPrefixV0;
182
183impl_db_record!(
184    key = ClientConfigKeyV0,
185    value = ClientConfigV0,
186    db_prefix = DbKeyPrefix::ClientConfig
187);
188
189impl_db_lookup!(
190    key = ClientConfigKeyV0,
191    query_prefix = ClientConfigKeyPrefixV0
192);
193
194#[derive(Debug, Encodable, Decodable, Serialize)]
195pub struct ApiSecretKey;
196
197#[derive(Debug, Encodable)]
198pub struct ApiSecretKeyPrefix;
199
200impl_db_record!(
201    key = ApiSecretKey,
202    value = String,
203    db_prefix = DbKeyPrefix::ApiSecret
204);
205
206impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
207
208/// Client metadata that will be stored/restored on backup&recovery
209#[derive(Debug, Encodable, Decodable, Serialize)]
210pub struct ClientMetadataKey;
211
212#[derive(Debug, Encodable)]
213pub struct ClientMetadataPrefix;
214
215impl_db_record!(
216    key = ClientMetadataKey,
217    value = Metadata,
218    db_prefix = DbKeyPrefix::ClientMetadata
219);
220
221impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
222
223/// Does the client modules need to run recovery before being usable?
224#[derive(Debug, Encodable, Decodable, Serialize)]
225pub struct ClientInitStateKey;
226
227#[derive(Debug, Encodable)]
228pub struct ClientInitStatePrefix;
229
230/// Client initialization mode
231#[derive(Debug, Encodable, Decodable)]
232pub enum InitMode {
233    /// Should only be used with freshly generated root secret
234    Fresh,
235    /// Should be used with root secrets provided by the user to recover a
236    /// (even if just possibly) already used secret.
237    Recover { snapshot: Option<ClientBackup> },
238}
239
240/// Like `InitMode`, but without no longer required data.
241///
242/// This is distinct from `InitMode` to prevent holding on to `snapshot`
243/// forever both for user's privacy and space use. In case user get hacked
244/// or phone gets stolen.
245#[derive(Debug, Encodable, Decodable)]
246pub enum InitModeComplete {
247    Fresh,
248    Recover,
249}
250
251/// The state of the client initialization
252#[derive(Debug, Encodable, Decodable)]
253pub enum InitState {
254    /// Client data initialization might still require some work (e.g. client
255    /// recovery)
256    Pending(InitMode),
257    /// Client initialization was complete
258    Complete(InitModeComplete),
259}
260
261impl InitState {
262    pub fn into_complete(self) -> Self {
263        match self {
264            InitState::Pending(p) => InitState::Complete(match p {
265                InitMode::Fresh => InitModeComplete::Fresh,
266                InitMode::Recover { .. } => InitModeComplete::Recover,
267            }),
268            InitState::Complete(t) => InitState::Complete(t),
269        }
270    }
271
272    pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
273        match self {
274            InitState::Pending(p) => match p {
275                InitMode::Fresh => None,
276                InitMode::Recover { snapshot } => Some(snapshot.clone()),
277            },
278            InitState::Complete(_) => None,
279        }
280    }
281
282    pub fn is_pending(&self) -> bool {
283        match self {
284            InitState::Pending(_) => true,
285            InitState::Complete(_) => false,
286        }
287    }
288}
289
290impl_db_record!(
291    key = ClientInitStateKey,
292    value = InitState,
293    db_prefix = DbKeyPrefix::ClientInitState
294);
295
296impl_db_lookup!(
297    key = ClientInitStateKey,
298    query_prefix = ClientInitStatePrefix
299);
300
301#[derive(Debug, Encodable, Decodable, Serialize)]
302pub struct ClientRecoverySnapshot;
303
304#[derive(Debug, Encodable, Decodable, Serialize)]
305pub struct ClientRecoverySnapshotPrefix;
306
307impl_db_record!(
308    key = ClientRecoverySnapshot,
309    value = Option<ClientBackup>,
310    db_prefix = DbKeyPrefix::ClientInitState
311);
312
313impl_db_lookup!(
314    key = ClientRecoverySnapshot,
315    query_prefix = ClientRecoverySnapshotPrefix
316);
317
318#[derive(Debug, Encodable, Decodable, Serialize)]
319pub struct ClientModuleRecovery {
320    pub module_instance_id: ModuleInstanceId,
321}
322
323#[derive(Debug, Encodable)]
324pub struct ClientModuleRecoveryPrefix;
325
326#[derive(Debug, Clone, Encodable, Decodable)]
327pub struct ClientModuleRecoveryState {
328    pub progress: RecoveryProgress,
329}
330
331impl ClientModuleRecoveryState {
332    pub fn is_done(&self) -> bool {
333        self.progress.is_done()
334    }
335}
336
337impl_db_record!(
338    key = ClientModuleRecovery,
339    value = ClientModuleRecoveryState,
340    db_prefix = DbKeyPrefix::ClientInitState,
341);
342
343impl_db_lookup!(
344    key = ClientModuleRecovery,
345    query_prefix = ClientModuleRecoveryPrefix
346);
347
348/// Last valid backup the client attempted to make
349///
350/// Can be used to find previous valid versions of
351/// module backup.
352#[derive(Debug, Encodable, Decodable)]
353pub struct LastBackupKey;
354
355impl_db_record!(
356    key = LastBackupKey,
357    value = ClientBackup,
358    db_prefix = DbKeyPrefix::ClientLastBackup
359);
360
361#[derive(
362    Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
363)]
364pub struct MetaFieldKey(pub String);
365
366#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
367pub struct MetaFieldPrefix;
368
369#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
370pub struct MetaFieldValue(pub String);
371
372#[derive(Encodable, Decodable, Debug)]
373pub struct MetaServiceInfoKey;
374
375#[derive(Encodable, Decodable, Debug)]
376pub struct MetaServiceInfo {
377    pub last_updated: SystemTime,
378    pub revision: u64,
379}
380
381impl_db_record!(
382    key = MetaFieldKey,
383    value = MetaFieldValue,
384    db_prefix = DbKeyPrefix::ClientMetaField
385);
386
387impl_db_record!(
388    key = MetaServiceInfoKey,
389    value = MetaServiceInfo,
390    db_prefix = DbKeyPrefix::ClientMetaServiceInfo
391);
392
393impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
394
395/// `ClientMigrationFn` is a function that modules can implement to "migrate"
396/// the database to the next database version.
397pub type ClientMigrationFn = for<'r, 'tx> fn(
398    &'r mut DatabaseTransaction<'tx>,
399    Vec<(Vec<u8>, OperationId)>, // active states
400    Vec<(Vec<u8>, OperationId)>, // inactive states
401) -> BoxFuture<
402    'r,
403    anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>>,
404>;
405
406pub fn get_core_client_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
407    let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
408    migrations.insert(DatabaseVersion(0), |mut ctx| {
409        Box::pin(async move {
410            let mut dbtx = ctx.dbtx();
411
412            let config_v0 = dbtx
413                .find_by_prefix(&ClientConfigKeyPrefixV0)
414                .await
415                .collect::<Vec<_>>()
416                .await;
417
418            assert!(config_v0.len() <= 1);
419            let Some((id, config_v0)) = config_v0.into_iter().next() else {
420                return Ok(());
421            };
422
423            let global = GlobalClientConfig {
424                api_endpoints: config_v0.global.api_endpoints,
425                broadcast_public_keys: None,
426                consensus_version: config_v0.global.consensus_version,
427                meta: config_v0.global.meta,
428            };
429
430            let config = ClientConfig {
431                global,
432                modules: config_v0.modules,
433            };
434
435            dbtx.remove_entry(&id).await;
436            dbtx.insert_new_entry(&ClientConfigKey, &config).await;
437            Ok(())
438        })
439    });
440
441    migrations
442}
443
444pub async fn apply_migrations_core_client(
445    db: &Database,
446    kind: String,
447    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
448) -> Result<(), anyhow::Error> {
449    apply_migrations(
450        db,
451        kind,
452        migrations,
453        None,
454        Some(DbKeyPrefix::UserData as u8),
455    )
456    .await
457}
458
459/// `apply_migrations_client` iterates from the on disk database version for the
460/// client module up to `target_db_version` and executes all of the migrations
461/// that exist in the migrations map, including state machine migrations.
462/// Each migration in the migrations map updates the database to have the
463/// correct on-disk data structures that the code is expecting. The entire
464/// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
465/// This function is called before the module is initialized and as long as the
466/// correct migrations are supplied in the migrations map, the module
467/// will be able to read and write from the database successfully.
468pub async fn apply_migrations_client(
469    db: &Database,
470    kind: String,
471    migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>,
472    module_instance_id: ModuleInstanceId,
473) -> Result<(), anyhow::Error> {
474    // Newly created databases will not have any data underneath the
475    // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
476    let mut dbtx = db.begin_transaction_nc().await;
477    let is_new_db = dbtx
478        .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
479        .await?
480        .next()
481        .await
482        .is_none();
483
484    let target_version = get_current_database_version(&migrations);
485
486    // First write the database version to disk if it does not exist.
487    create_database_version(
488        db,
489        target_version,
490        Some(module_instance_id),
491        kind.clone(),
492        is_new_db,
493    )
494    .await?;
495
496    let mut global_dbtx = db.begin_transaction().await;
497    let current_version = global_dbtx
498        .get_value(&DatabaseVersionKey(module_instance_id))
499        .await;
500
501    let db_version = if let Some(mut current_version) = current_version {
502        if current_version == target_version {
503            trace!(
504                target: LOG_CLIENT_DB,
505                %current_version,
506                %target_version,
507                module_instance_id,
508                kind,
509                "Database version up to date"
510            );
511            global_dbtx.ignore_uncommitted();
512            return Ok(());
513        }
514
515        if target_version < current_version {
516            return Err(anyhow::anyhow!(format!(
517                "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
518                current_version,
519                target_version,
520            )));
521        }
522
523        info!(
524            target: LOG_CLIENT_DB,
525            %current_version,
526            %target_version,
527            module_instance_id,
528            kind,
529            "Migrating client module database"
530        );
531        let mut active_states =
532            get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
533        let mut inactive_states =
534            get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
535
536        while current_version < target_version {
537            let new_states = if let Some(migration) = migrations.get(&current_version) {
538                debug!(
539                     target: LOG_CLIENT_DB,
540                     module_instance_id,
541                     %kind,
542                     %current_version,
543                     %target_version,
544                     "Running module db migration");
545
546                migration(
547                    &mut global_dbtx
548                        .to_ref_with_prefix_module_id(module_instance_id)
549                        .0
550                        .into_nc(),
551                    active_states.clone(),
552                    inactive_states.clone(),
553                )
554                .await?
555            } else {
556                warn!(
557                    target: LOG_CLIENT_DB,
558                    ?current_version, "Missing client db migration");
559                None
560            };
561
562            // If the client migration returned new states, a state machine migration has
563            // occurred, and the new states need to be persisted to the database.
564            if let Some((new_active_states, new_inactive_states)) = new_states {
565                remove_old_and_persist_new_active_states(
566                    &mut global_dbtx.to_ref_nc(),
567                    new_active_states.clone(),
568                    active_states.clone(),
569                    module_instance_id,
570                )
571                .await;
572                remove_old_and_persist_new_inactive_states(
573                    &mut global_dbtx.to_ref_nc(),
574                    new_inactive_states.clone(),
575                    inactive_states.clone(),
576                    module_instance_id,
577                )
578                .await;
579
580                // the new states become the old states for the next migration
581                active_states = new_active_states;
582                inactive_states = new_inactive_states;
583            }
584
585            current_version = current_version.increment();
586            global_dbtx
587                .insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
588                .await;
589        }
590
591        current_version
592    } else {
593        target_version
594    };
595
596    global_dbtx.commit_tx_result().await?;
597    debug!(
598        target: LOG_CLIENT_DB,
599        ?kind, ?db_version, "Client DB Version");
600    Ok(())
601}
602
603/// Reads all active states from the database and returns `Vec<DynState>`.
604/// TODO: It is unfortunate that we can't read states by the module's instance
605/// id so we are forced to return all active states. Once we do a db migration
606/// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
607/// only read the module's relevant states.
608pub async fn get_active_states(
609    dbtx: &mut DatabaseTransaction<'_>,
610    module_instance_id: ModuleInstanceId,
611) -> Vec<(Vec<u8>, OperationId)> {
612    dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
613        .await
614        .filter_map(|(state, _)| async move {
615            if module_instance_id == state.module_instance_id {
616                Some((state.state, state.operation_id))
617            } else {
618                None
619            }
620        })
621        .collect::<Vec<_>>()
622        .await
623}
624
625/// Reads all inactive states from the database and returns `Vec<DynState>`.
626/// TODO: It is unfortunate that we can't read states by the module's instance
627/// id so we are forced to return all inactive states. Once we do a db migration
628/// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
629/// only read the module's relevant states.
630pub async fn get_inactive_states(
631    dbtx: &mut DatabaseTransaction<'_>,
632    module_instance_id: ModuleInstanceId,
633) -> Vec<(Vec<u8>, OperationId)> {
634    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
635        .await
636        .filter_map(|(state, _)| async move {
637            if module_instance_id == state.module_instance_id {
638                Some((state.state, state.operation_id))
639            } else {
640                None
641            }
642        })
643        .collect::<Vec<_>>()
644        .await
645}
646
647/// Persists new active states by first removing all current active states, and
648/// re-writing with the new set of active states. `new_active_states` is
649/// expected to contain all active states, not just the newly created states.
650pub async fn remove_old_and_persist_new_active_states(
651    dbtx: &mut DatabaseTransaction<'_>,
652    new_active_states: Vec<(Vec<u8>, OperationId)>,
653    states_to_remove: Vec<(Vec<u8>, OperationId)>,
654    module_instance_id: ModuleInstanceId,
655) {
656    // Remove all existing active states
657    for (bytes, operation_id) in states_to_remove {
658        dbtx.remove_entry(&ActiveStateKeyBytes {
659            operation_id,
660            module_instance_id,
661            state: bytes,
662        })
663        .await
664        .expect("Did not delete anything");
665    }
666
667    // Insert new "migrated" active states
668    for (bytes, operation_id) in new_active_states {
669        dbtx.insert_new_entry(
670            &ActiveStateKeyBytes {
671                operation_id,
672                module_instance_id,
673                state: bytes,
674            },
675            &ActiveStateMeta::default(),
676        )
677        .await;
678    }
679}
680
681/// Persists new inactive states by first removing all current inactive states,
682/// and re-writing with the new set of inactive states. `new_inactive_states` is
683/// expected to contain all inactive states, not just the newly created states.
684pub async fn remove_old_and_persist_new_inactive_states(
685    dbtx: &mut DatabaseTransaction<'_>,
686    new_inactive_states: Vec<(Vec<u8>, OperationId)>,
687    states_to_remove: Vec<(Vec<u8>, OperationId)>,
688    module_instance_id: ModuleInstanceId,
689) {
690    // Remove all existing active states
691    for (bytes, operation_id) in states_to_remove {
692        dbtx.remove_entry(&InactiveStateKeyBytes {
693            operation_id,
694            module_instance_id,
695            state: bytes,
696        })
697        .await
698        .expect("Did not delete anything");
699    }
700
701    // Insert new "migrated" inactive states
702    for (bytes, operation_id) in new_inactive_states {
703        dbtx.insert_new_entry(
704            &InactiveStateKeyBytes {
705                operation_id,
706                module_instance_id,
707                state: bytes,
708            },
709            &InactiveStateMeta {
710                created_at: fedimint_core::time::now(),
711                exited_at: fedimint_core::time::now(),
712            },
713        )
714        .await;
715    }
716}
717
718/// Helper function definition for migrating a single state.
719type MigrateStateFn =
720    fn(OperationId, &mut Cursor<&[u8]>) -> anyhow::Result<Option<(Vec<u8>, OperationId)>>;
721
722/// Migrates a particular state by looping over all active and inactive states.
723/// If the `migrate` closure returns `None`, this state was not migrated and
724/// should be added to the new state machine vectors.
725pub fn migrate_state(
726    active_states: Vec<(Vec<u8>, OperationId)>,
727    inactive_states: Vec<(Vec<u8>, OperationId)>,
728    migrate: MigrateStateFn,
729) -> anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>> {
730    let mut new_active_states = Vec::with_capacity(active_states.len());
731    for (active_state, operation_id) in active_states {
732        let bytes = active_state.as_slice();
733
734        let decoders = ModuleDecoderRegistry::default();
735        let mut cursor = std::io::Cursor::new(bytes);
736        let module_instance_id =
737            fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?;
738
739        let state = match migrate(operation_id, &mut cursor)? {
740            Some((mut state, operation_id)) => {
741                let mut final_state = module_instance_id.to_bytes();
742                final_state.append(&mut state);
743                (final_state, operation_id)
744            }
745            None => (active_state, operation_id),
746        };
747
748        new_active_states.push(state);
749    }
750
751    let mut new_inactive_states = Vec::with_capacity(inactive_states.len());
752    for (inactive_state, operation_id) in inactive_states {
753        let bytes = inactive_state.as_slice();
754
755        let decoders = ModuleDecoderRegistry::default();
756        let mut cursor = std::io::Cursor::new(bytes);
757        let module_instance_id =
758            fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?;
759
760        let state = match migrate(operation_id, &mut cursor)? {
761            Some((mut state, operation_id)) => {
762                let mut final_state = module_instance_id.to_bytes();
763                final_state.append(&mut state);
764                (final_state, operation_id)
765            }
766            None => (inactive_state, operation_id),
767        };
768
769        new_inactive_states.push(state);
770    }
771
772    Ok(Some((new_active_states, new_inactive_states)))
773}