1use std::collections::BTreeMap;
2use std::io::Cursor;
3use std::time::SystemTime;
4
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
8use fedimint_core::core::{ModuleInstanceId, OperationId};
9use fedimint_core::db::{
10 apply_migrations, create_database_version, get_current_database_version, CoreMigrationFn,
11 Database, DatabaseTransaction, DatabaseValue, DatabaseVersion, DatabaseVersionKey,
12 IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::module::registry::ModuleDecoderRegistry;
16use fedimint_core::module::SupportedApiVersionsSummary;
17use fedimint_core::util::BoxFuture;
18use fedimint_core::{impl_db_lookup, impl_db_record, PeerId};
19use fedimint_eventlog::{
20 EventLogId, UnordedEventLogId, DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
21};
22use fedimint_logging::LOG_CLIENT_DB;
23use futures::StreamExt;
24use serde::{Deserialize, Serialize};
25use strum_macros::EnumIter;
26use tracing::{debug, info, trace, warn};
27
28use crate::backup::{ClientBackup, Metadata};
29use crate::module::recovery::RecoveryProgress;
30use crate::oplog::{OperationLogEntry, OperationLogEntryV0, OperationOutcome};
31use crate::sm::executor::{
32 ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKeyBytes,
33 InactiveStateKeyPrefixBytes,
34};
35use crate::sm::{ActiveStateMeta, InactiveStateMeta};
36
37#[repr(u8)]
38#[derive(Clone, EnumIter, Debug)]
39pub enum DbKeyPrefix {
40 EncodedClientSecret = 0x28,
41 ClientSecret = 0x29, ClientPreRootSecretHash = 0x2a,
43 OperationLog = 0x2c,
44 ChronologicalOperationLog = 0x2d,
45 CommonApiVersionCache = 0x2e,
46 ClientConfig = 0x2f,
47 ClientInviteCode = 0x30, ClientInitState = 0x31,
49 ClientMetadata = 0x32,
50 ClientLastBackup = 0x33,
51 ClientMetaField = 0x34,
52 ClientMetaServiceInfo = 0x35,
53 ApiSecret = 0x36,
54 PeerLastApiVersionsSummaryCache = 0x37,
55 ApiUrlAnnouncement = 0x38,
56 EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
57 UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
58
59 UserData = 0xb0,
69 ExternalReservedStart = 0xb1,
72 ExternalReservedEnd = 0xcf,
75 InternalReservedStart = 0xd0,
77 ModuleGlobalPrefix = 0xff,
79}
80
81impl std::fmt::Display for DbKeyPrefix {
82 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83 write!(f, "{self:?}")
84 }
85}
86
87#[derive(Debug, Encodable, Decodable)]
88pub struct EncodedClientSecretKey;
89
90#[derive(Debug, Encodable, Decodable)]
91pub struct EncodedClientSecretKeyPrefix;
92
93impl_db_record!(
94 key = EncodedClientSecretKey,
95 value = Vec<u8>,
96 db_prefix = DbKeyPrefix::EncodedClientSecret,
97);
98impl_db_lookup!(
99 key = EncodedClientSecretKey,
100 query_prefix = EncodedClientSecretKeyPrefix
101);
102
103#[derive(Debug, Encodable, Decodable, Serialize)]
104pub struct OperationLogKey {
105 pub operation_id: OperationId,
106}
107
108#[derive(Debug, Encodable)]
109pub struct OperationLogKeyPrefix;
110
111impl_db_record!(
112 key = OperationLogKey,
113 value = OperationLogEntry,
114 db_prefix = DbKeyPrefix::OperationLog
115);
116
117impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
118
119#[derive(Debug, Encodable, Decodable, Serialize)]
120pub struct OperationLogKeyV0 {
121 pub operation_id: OperationId,
122}
123
124#[derive(Debug, Encodable)]
125pub struct OperationLogKeyPrefixV0;
126
127impl_db_record!(
128 key = OperationLogKeyV0,
129 value = OperationLogEntryV0,
130 db_prefix = DbKeyPrefix::OperationLog
131);
132
133impl_db_lookup!(
134 key = OperationLogKeyV0,
135 query_prefix = OperationLogKeyPrefixV0
136);
137
138#[derive(Debug, Encodable, Decodable, Serialize)]
139pub struct ClientPreRootSecretHashKey;
140
141impl_db_record!(
142 key = ClientPreRootSecretHashKey,
143 value = [u8; 8],
144 db_prefix = DbKeyPrefix::ClientPreRootSecretHash
145);
146
147#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize)]
149pub struct ChronologicalOperationLogKey {
150 pub creation_time: std::time::SystemTime,
151 pub operation_id: OperationId,
152}
153
154#[derive(Debug, Encodable)]
155pub struct ChronologicalOperationLogKeyPrefix;
156
157impl_db_record!(
158 key = ChronologicalOperationLogKey,
159 value = (),
160 db_prefix = DbKeyPrefix::ChronologicalOperationLog
161);
162
163impl_db_lookup!(
164 key = ChronologicalOperationLogKey,
165 query_prefix = ChronologicalOperationLogKeyPrefix
166);
167
168#[derive(Debug, Encodable, Decodable)]
169pub struct CachedApiVersionSetKey;
170
171#[derive(Debug, Encodable, Decodable)]
172pub struct CachedApiVersionSet(pub ApiVersionSet);
173
174impl_db_record!(
175 key = CachedApiVersionSetKey,
176 value = CachedApiVersionSet,
177 db_prefix = DbKeyPrefix::CommonApiVersionCache
178);
179
180#[derive(Debug, Encodable, Decodable)]
181pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
182
183#[derive(Debug, Encodable, Decodable)]
184pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
185
186impl_db_record!(
187 key = PeerLastApiVersionsSummaryKey,
188 value = PeerLastApiVersionsSummary,
189 db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
190);
191
192#[derive(Debug, Encodable, Decodable, Serialize)]
193pub struct ClientConfigKey;
194
195impl_db_record!(
196 key = ClientConfigKey,
197 value = ClientConfig,
198 db_prefix = DbKeyPrefix::ClientConfig
199);
200
201#[derive(Debug, Encodable, Decodable, Serialize)]
202pub struct ClientConfigKeyV0 {
203 pub id: FederationId,
204}
205
206#[derive(Debug, Encodable)]
207pub struct ClientConfigKeyPrefixV0;
208
209impl_db_record!(
210 key = ClientConfigKeyV0,
211 value = ClientConfigV0,
212 db_prefix = DbKeyPrefix::ClientConfig
213);
214
215impl_db_lookup!(
216 key = ClientConfigKeyV0,
217 query_prefix = ClientConfigKeyPrefixV0
218);
219
220#[derive(Debug, Encodable, Decodable, Serialize)]
221pub struct ApiSecretKey;
222
223#[derive(Debug, Encodable)]
224pub struct ApiSecretKeyPrefix;
225
226impl_db_record!(
227 key = ApiSecretKey,
228 value = String,
229 db_prefix = DbKeyPrefix::ApiSecret
230);
231
232impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
233
234#[derive(Debug, Encodable, Decodable, Serialize)]
236pub struct ClientMetadataKey;
237
238#[derive(Debug, Encodable)]
239pub struct ClientMetadataPrefix;
240
241impl_db_record!(
242 key = ClientMetadataKey,
243 value = Metadata,
244 db_prefix = DbKeyPrefix::ClientMetadata
245);
246
247impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
248
249#[derive(Debug, Encodable, Decodable, Serialize)]
251pub struct ClientInitStateKey;
252
253#[derive(Debug, Encodable)]
254pub struct ClientInitStatePrefix;
255
256#[derive(Debug, Encodable, Decodable)]
258pub enum InitMode {
259 Fresh,
261 Recover { snapshot: Option<ClientBackup> },
264}
265
266#[derive(Debug, Encodable, Decodable)]
272pub enum InitModeComplete {
273 Fresh,
274 Recover,
275}
276
277#[derive(Debug, Encodable, Decodable)]
279pub enum InitState {
280 Pending(InitMode),
283 Complete(InitModeComplete),
285}
286
287impl InitState {
288 pub fn into_complete(self) -> Self {
289 match self {
290 InitState::Pending(p) => InitState::Complete(match p {
291 InitMode::Fresh => InitModeComplete::Fresh,
292 InitMode::Recover { .. } => InitModeComplete::Recover,
293 }),
294 InitState::Complete(t) => InitState::Complete(t),
295 }
296 }
297
298 pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
299 match self {
300 InitState::Pending(p) => match p {
301 InitMode::Fresh => None,
302 InitMode::Recover { snapshot } => Some(snapshot.clone()),
303 },
304 InitState::Complete(_) => None,
305 }
306 }
307
308 pub fn is_pending(&self) -> bool {
309 match self {
310 InitState::Pending(_) => true,
311 InitState::Complete(_) => false,
312 }
313 }
314}
315
316impl_db_record!(
317 key = ClientInitStateKey,
318 value = InitState,
319 db_prefix = DbKeyPrefix::ClientInitState
320);
321
322impl_db_lookup!(
323 key = ClientInitStateKey,
324 query_prefix = ClientInitStatePrefix
325);
326
327#[derive(Debug, Encodable, Decodable, Serialize)]
328pub struct ClientRecoverySnapshot;
329
330#[derive(Debug, Encodable, Decodable, Serialize)]
331pub struct ClientRecoverySnapshotPrefix;
332
333impl_db_record!(
334 key = ClientRecoverySnapshot,
335 value = Option<ClientBackup>,
336 db_prefix = DbKeyPrefix::ClientInitState
337);
338
339impl_db_lookup!(
340 key = ClientRecoverySnapshot,
341 query_prefix = ClientRecoverySnapshotPrefix
342);
343
344#[derive(Debug, Encodable, Decodable, Serialize)]
345pub struct ClientModuleRecovery {
346 pub module_instance_id: ModuleInstanceId,
347}
348
349#[derive(Debug, Encodable)]
350pub struct ClientModuleRecoveryPrefix;
351
352#[derive(Debug, Clone, Encodable, Decodable)]
353pub struct ClientModuleRecoveryState {
354 pub progress: RecoveryProgress,
355}
356
357impl ClientModuleRecoveryState {
358 pub fn is_done(&self) -> bool {
359 self.progress.is_done()
360 }
361}
362
363impl_db_record!(
364 key = ClientModuleRecovery,
365 value = ClientModuleRecoveryState,
366 db_prefix = DbKeyPrefix::ClientInitState,
367);
368
369impl_db_lookup!(
370 key = ClientModuleRecovery,
371 query_prefix = ClientModuleRecoveryPrefix
372);
373
374#[derive(Debug, Encodable, Decodable)]
379pub struct LastBackupKey;
380
381impl_db_record!(
382 key = LastBackupKey,
383 value = ClientBackup,
384 db_prefix = DbKeyPrefix::ClientLastBackup
385);
386
387#[derive(
388 Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
389)]
390pub struct MetaFieldKey(pub String);
391
392#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
393pub struct MetaFieldPrefix;
394
395#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
396pub struct MetaFieldValue(pub String);
397
398#[derive(Encodable, Decodable, Debug)]
399pub struct MetaServiceInfoKey;
400
401#[derive(Encodable, Decodable, Debug)]
402pub struct MetaServiceInfo {
403 pub last_updated: SystemTime,
404 pub revision: u64,
405}
406
407impl_db_record!(
408 key = MetaFieldKey,
409 value = MetaFieldValue,
410 db_prefix = DbKeyPrefix::ClientMetaField
411);
412
413impl_db_record!(
414 key = MetaServiceInfoKey,
415 value = MetaServiceInfo,
416 db_prefix = DbKeyPrefix::ClientMetaServiceInfo
417);
418
419impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
420
421pub type ClientMigrationFn = for<'r, 'tx> fn(
424 &'r mut DatabaseTransaction<'tx>,
425 Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>, ) -> BoxFuture<
428 'r,
429 anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>>,
430>;
431
432pub fn get_core_client_database_migrations() -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
433 let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
434 migrations.insert(DatabaseVersion(0), |mut ctx| {
435 Box::pin(async move {
436 let mut dbtx = ctx.dbtx();
437
438 let config_v0 = dbtx
439 .find_by_prefix(&ClientConfigKeyPrefixV0)
440 .await
441 .collect::<Vec<_>>()
442 .await;
443
444 assert!(config_v0.len() <= 1);
445 let Some((id, config_v0)) = config_v0.into_iter().next() else {
446 return Ok(());
447 };
448
449 let global = GlobalClientConfig {
450 api_endpoints: config_v0.global.api_endpoints,
451 broadcast_public_keys: None,
452 consensus_version: config_v0.global.consensus_version,
453 meta: config_v0.global.meta,
454 };
455
456 let config = ClientConfig {
457 global,
458 modules: config_v0.modules,
459 };
460
461 dbtx.remove_entry(&id).await;
462 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
463 Ok(())
464 })
465 });
466
467 migrations.insert(DatabaseVersion(1), |mut ctx| {
469 Box::pin(async move {
470 let mut dbtx = ctx.dbtx();
471
472 let operation_logs = dbtx
474 .find_by_prefix(&OperationLogKeyPrefixV0)
475 .await
476 .collect::<Vec<_>>()
477 .await;
478
479 let mut op_id_max_time = BTreeMap::new();
481
482 {
484 let mut inactive_states_stream =
485 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
486
487 while let Some((state, meta)) = inactive_states_stream.next().await {
488 let entry = op_id_max_time
489 .entry(state.operation_id)
490 .or_insert(meta.exited_at);
491 *entry = (*entry).max(meta.exited_at);
492 }
493 }
494 for (op_key_v0, log_entry_v0) in operation_logs {
496 let new_entry = OperationLogEntry {
497 operation_module_kind: log_entry_v0.operation_module_kind,
498 meta: log_entry_v0.meta,
499 outcome: log_entry_v0.outcome.map(|outcome| {
500 OperationOutcome {
501 outcome,
502 time: op_id_max_time
505 .get(&op_key_v0.operation_id)
506 .copied()
507 .unwrap_or_else(fedimint_core::time::now),
508 }
509 }),
510 };
511
512 dbtx.remove_entry(&op_key_v0).await;
513 dbtx.insert_entry(
514 &OperationLogKey {
515 operation_id: op_key_v0.operation_id,
516 },
517 &new_entry,
518 )
519 .await;
520 }
521
522 Ok(())
523 })
524 });
525
526 migrations.insert(DatabaseVersion(2), |mut ctx| {
528 Box::pin(async move {
529 let mut dbtx = ctx.dbtx();
530
531 {
533 let mut ordered_log_entries = dbtx
534 .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
535 .await
536 .expect("DB operation failed");
537 let mut keys_to_migrate = vec![];
538 while let Some((k, _v)) = ordered_log_entries.next().await {
539 trace!(target: LOG_CLIENT_DB,
540 k=%k.as_hex(),
541 "Checking ordered log key"
542 );
543 if EventLogId::consensus_decode_whole(
544 &k[1..],
545 &ModuleDecoderRegistry::default(),
546 )
547 .is_err()
548 {
549 assert!(UnordedEventLogId::consensus_decode_whole(
550 &k[1..],
551 &ModuleDecoderRegistry::default()
552 )
553 .is_ok());
554 keys_to_migrate.push(k);
555 }
556 }
557 drop(ordered_log_entries);
558 for mut key_to_migrate in keys_to_migrate {
559 warn!(target: LOG_CLIENT_DB,
560 k=%key_to_migrate.as_hex(),
561 "Migrating unordered event log entry written to an ordered log"
562 );
563 let v = dbtx
564 .raw_remove_entry(&key_to_migrate)
565 .await
566 .expect("DB operation failed")
567 .expect("Was there a moment ago");
568 assert_eq!(key_to_migrate[0], 0x39);
569 key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
570 assert_eq!(key_to_migrate[0], 0x3a);
571 dbtx.raw_insert_bytes(&key_to_migrate, &v)
572 .await
573 .expect("DB operation failed");
574 }
575 }
576
577 {
579 let mut unordered_log_entries = dbtx
580 .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
581 .await
582 .expect("DB operation failed");
583 let mut keys_to_migrate = vec![];
584 while let Some((k, _v)) = unordered_log_entries.next().await {
585 trace!(target: LOG_CLIENT_DB,
586 k=%k.as_hex(),
587 "Checking unordered log key"
588 );
589 if UnordedEventLogId::consensus_decode_whole(
590 &k[1..],
591 &ModuleDecoderRegistry::default(),
592 )
593 .is_err()
594 {
595 assert!(EventLogId::consensus_decode_whole(
596 &k[1..],
597 &ModuleDecoderRegistry::default()
598 )
599 .is_ok());
600 keys_to_migrate.push(k);
601 }
602 }
603 drop(unordered_log_entries);
604 for mut key_to_migrate in keys_to_migrate {
605 warn!(target: LOG_CLIENT_DB,
606 k=%key_to_migrate.as_hex(),
607 "Migrating ordered event log entry written to an unordered log"
608 );
609 let v = dbtx
610 .raw_remove_entry(&key_to_migrate)
611 .await
612 .expect("DB operation failed")
613 .expect("Was there a moment ago");
614 assert_eq!(key_to_migrate[0], 0x3a);
615 key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
616 assert_eq!(key_to_migrate[0], 0x39);
617 dbtx.raw_insert_bytes(&key_to_migrate, &v)
618 .await
619 .expect("DB operation failed");
620 }
621 }
622 Ok(())
623 })
624 });
625 migrations
626}
627
628pub async fn apply_migrations_core_client(
629 db: &Database,
630 kind: String,
631 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
632) -> Result<(), anyhow::Error> {
633 apply_migrations(
634 db,
635 kind,
636 migrations,
637 None,
638 Some(DbKeyPrefix::UserData as u8),
639 )
640 .await
641}
642
643pub async fn apply_migrations_client(
653 db: &Database,
654 kind: String,
655 migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>,
656 module_instance_id: ModuleInstanceId,
657) -> Result<(), anyhow::Error> {
658 let mut dbtx = db.begin_transaction_nc().await;
661 let is_new_db = dbtx
662 .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
663 .await?
664 .next()
665 .await
666 .is_none();
667
668 let target_version = get_current_database_version(&migrations);
669
670 create_database_version(
672 db,
673 target_version,
674 Some(module_instance_id),
675 kind.clone(),
676 is_new_db,
677 )
678 .await?;
679
680 let mut global_dbtx = db.begin_transaction().await;
681 let current_version = global_dbtx
682 .get_value(&DatabaseVersionKey(module_instance_id))
683 .await;
684
685 let db_version = if let Some(mut current_version) = current_version {
686 if current_version == target_version {
687 trace!(
688 target: LOG_CLIENT_DB,
689 %current_version,
690 %target_version,
691 module_instance_id,
692 kind,
693 "Database version up to date"
694 );
695 global_dbtx.ignore_uncommitted();
696 return Ok(());
697 }
698
699 if target_version < current_version {
700 return Err(anyhow::anyhow!(format!(
701 "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
702 current_version,
703 target_version,
704 )));
705 }
706
707 info!(
708 target: LOG_CLIENT_DB,
709 %current_version,
710 %target_version,
711 module_instance_id,
712 kind,
713 "Migrating client module database"
714 );
715 let mut active_states =
716 get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
717 let mut inactive_states =
718 get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
719
720 while current_version < target_version {
721 let new_states = if let Some(migration) = migrations.get(¤t_version) {
722 debug!(
723 target: LOG_CLIENT_DB,
724 module_instance_id,
725 %kind,
726 %current_version,
727 %target_version,
728 "Running module db migration");
729
730 migration(
731 &mut global_dbtx
732 .to_ref_with_prefix_module_id(module_instance_id)
733 .0
734 .into_nc(),
735 active_states.clone(),
736 inactive_states.clone(),
737 )
738 .await?
739 } else {
740 warn!(
741 target: LOG_CLIENT_DB,
742 ?current_version, "Missing client db migration");
743 None
744 };
745
746 if let Some((new_active_states, new_inactive_states)) = new_states {
749 remove_old_and_persist_new_active_states(
750 &mut global_dbtx.to_ref_nc(),
751 new_active_states.clone(),
752 active_states.clone(),
753 module_instance_id,
754 )
755 .await;
756 remove_old_and_persist_new_inactive_states(
757 &mut global_dbtx.to_ref_nc(),
758 new_inactive_states.clone(),
759 inactive_states.clone(),
760 module_instance_id,
761 )
762 .await;
763
764 active_states = new_active_states;
766 inactive_states = new_inactive_states;
767 }
768
769 current_version = current_version.increment();
770 global_dbtx
771 .insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_version)
772 .await;
773 }
774
775 current_version
776 } else {
777 target_version
778 };
779
780 global_dbtx.commit_tx_result().await?;
781 debug!(
782 target: LOG_CLIENT_DB,
783 ?kind, ?db_version, "Client DB Version");
784 Ok(())
785}
786
787pub async fn get_active_states(
793 dbtx: &mut DatabaseTransaction<'_>,
794 module_instance_id: ModuleInstanceId,
795) -> Vec<(Vec<u8>, OperationId)> {
796 dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
797 .await
798 .filter_map(|(state, _)| async move {
799 if module_instance_id == state.module_instance_id {
800 Some((state.state, state.operation_id))
801 } else {
802 None
803 }
804 })
805 .collect::<Vec<_>>()
806 .await
807}
808
809pub async fn get_inactive_states(
815 dbtx: &mut DatabaseTransaction<'_>,
816 module_instance_id: ModuleInstanceId,
817) -> Vec<(Vec<u8>, OperationId)> {
818 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
819 .await
820 .filter_map(|(state, _)| async move {
821 if module_instance_id == state.module_instance_id {
822 Some((state.state, state.operation_id))
823 } else {
824 None
825 }
826 })
827 .collect::<Vec<_>>()
828 .await
829}
830
831pub async fn remove_old_and_persist_new_active_states(
835 dbtx: &mut DatabaseTransaction<'_>,
836 new_active_states: Vec<(Vec<u8>, OperationId)>,
837 states_to_remove: Vec<(Vec<u8>, OperationId)>,
838 module_instance_id: ModuleInstanceId,
839) {
840 for (bytes, operation_id) in states_to_remove {
842 dbtx.remove_entry(&ActiveStateKeyBytes {
843 operation_id,
844 module_instance_id,
845 state: bytes,
846 })
847 .await
848 .expect("Did not delete anything");
849 }
850
851 for (bytes, operation_id) in new_active_states {
853 dbtx.insert_new_entry(
854 &ActiveStateKeyBytes {
855 operation_id,
856 module_instance_id,
857 state: bytes,
858 },
859 &ActiveStateMeta::default(),
860 )
861 .await;
862 }
863}
864
865pub async fn remove_old_and_persist_new_inactive_states(
869 dbtx: &mut DatabaseTransaction<'_>,
870 new_inactive_states: Vec<(Vec<u8>, OperationId)>,
871 states_to_remove: Vec<(Vec<u8>, OperationId)>,
872 module_instance_id: ModuleInstanceId,
873) {
874 for (bytes, operation_id) in states_to_remove {
876 dbtx.remove_entry(&InactiveStateKeyBytes {
877 operation_id,
878 module_instance_id,
879 state: bytes,
880 })
881 .await
882 .expect("Did not delete anything");
883 }
884
885 for (bytes, operation_id) in new_inactive_states {
887 dbtx.insert_new_entry(
888 &InactiveStateKeyBytes {
889 operation_id,
890 module_instance_id,
891 state: bytes,
892 },
893 &InactiveStateMeta {
894 created_at: fedimint_core::time::now(),
895 exited_at: fedimint_core::time::now(),
896 },
897 )
898 .await;
899 }
900}
901
902type MigrateStateFn =
904 fn(OperationId, &mut Cursor<&[u8]>) -> anyhow::Result<Option<(Vec<u8>, OperationId)>>;
905
906pub fn migrate_state(
910 active_states: Vec<(Vec<u8>, OperationId)>,
911 inactive_states: Vec<(Vec<u8>, OperationId)>,
912 migrate: MigrateStateFn,
913) -> anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>> {
914 let mut new_active_states = Vec::with_capacity(active_states.len());
915 for (active_state, operation_id) in active_states {
916 let bytes = active_state.as_slice();
917
918 let decoders = ModuleDecoderRegistry::default();
919 let mut cursor = std::io::Cursor::new(bytes);
920 let module_instance_id = fedimint_core::core::ModuleInstanceId::consensus_decode_partial(
921 &mut cursor,
922 &decoders,
923 )?;
924
925 let state = match migrate(operation_id, &mut cursor)? {
926 Some((mut state, operation_id)) => {
927 let mut final_state = module_instance_id.to_bytes();
928 final_state.append(&mut state);
929 (final_state, operation_id)
930 }
931 None => (active_state, operation_id),
932 };
933
934 new_active_states.push(state);
935 }
936
937 let mut new_inactive_states = Vec::with_capacity(inactive_states.len());
938 for (inactive_state, operation_id) in inactive_states {
939 let bytes = inactive_state.as_slice();
940
941 let decoders = ModuleDecoderRegistry::default();
942 let mut cursor = std::io::Cursor::new(bytes);
943 let module_instance_id = fedimint_core::core::ModuleInstanceId::consensus_decode_partial(
944 &mut cursor,
945 &decoders,
946 )?;
947
948 let state = match migrate(operation_id, &mut cursor)? {
949 Some((mut state, operation_id)) => {
950 let mut final_state = module_instance_id.to_bytes();
951 final_state.append(&mut state);
952 (final_state, operation_id)
953 }
954 None => (inactive_state, operation_id),
955 };
956
957 new_inactive_states.push(state);
958 }
959
960 Ok(Some((new_active_states, new_inactive_states)))
961}