use std::collections::BTreeMap;
use std::time::SystemTime;
use fedimint_core::api::ApiVersionSet;
use fedimint_core::config::{ClientConfig, FederationId};
use fedimint_core::core::{ModuleInstanceId, OperationId};
use fedimint_core::db::{
migrate_database_version, Database, DatabaseTransaction, DatabaseValue, DatabaseVersion,
DatabaseVersionKey, IDatabaseTransactionOpsCoreTyped,
};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::util::BoxFuture;
use fedimint_core::{impl_db_lookup, impl_db_record};
use fedimint_logging::LOG_DB;
use futures::StreamExt;
use serde::Serialize;
use strum_macros::EnumIter;
use tracing::{info, warn};
use crate::backup::{ClientBackup, Metadata};
use crate::module::recovery::RecoveryProgress;
use crate::oplog::OperationLogEntry;
use crate::sm::executor::{
ActiveStateKey, ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKey,
InactiveStateKeyBytes, InactiveStateKeyPrefixBytes,
};
use crate::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
#[repr(u8)]
#[derive(Clone, EnumIter, Debug)]
pub enum DbKeyPrefix {
EncodedClientSecret = 0x28,
ClientSecret = 0x29, OperationLog = 0x2c,
ChronologicalOperationLog = 0x2d,
CommonApiVersionCache = 0x2e,
ClientConfig = 0x2f,
ClientInviteCode = 0x30, ClientInitState = 0x31,
ClientMetadata = 0x32,
ClientLastBackup = 0x33,
ClientMetaField = 0x34,
ClientMetaServiceInfo = 0x35,
UserData = 0xb0,
ExternalReservedStart = 0xb1,
ExternalReservedEnd = 0xcf,
InternalReservedStart = 0xd0,
ModuleGlobalPrefix = 0xff,
}
impl std::fmt::Display for DbKeyPrefix {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
#[derive(Debug, Encodable, Decodable)]
pub struct EncodedClientSecretKey;
#[derive(Debug, Encodable, Decodable)]
pub struct EncodedClientSecretKeyPrefix;
impl_db_record!(
key = EncodedClientSecretKey,
value = Vec<u8>,
db_prefix = DbKeyPrefix::EncodedClientSecret,
);
impl_db_lookup!(
key = EncodedClientSecretKey,
query_prefix = EncodedClientSecretKeyPrefix
);
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct OperationLogKey {
pub operation_id: OperationId,
}
impl_db_record!(
key = OperationLogKey,
value = OperationLogEntry,
db_prefix = DbKeyPrefix::OperationLog
);
#[derive(Debug, Clone, Copy, Encodable, Decodable, Serialize)]
pub struct ChronologicalOperationLogKey {
pub creation_time: std::time::SystemTime,
pub operation_id: OperationId,
}
#[derive(Debug, Encodable)]
pub struct ChronologicalOperationLogKeyPrefix;
impl_db_record!(
key = ChronologicalOperationLogKey,
value = (),
db_prefix = DbKeyPrefix::ChronologicalOperationLog
);
impl_db_lookup!(
key = ChronologicalOperationLogKey,
query_prefix = ChronologicalOperationLogKeyPrefix
);
#[derive(Debug, Encodable, Decodable)]
pub struct CachedApiVersionSetKey;
#[derive(Debug, Encodable, Decodable)]
pub struct CachedApiVersionSet(pub ApiVersionSet);
impl_db_record!(
key = CachedApiVersionSetKey,
value = CachedApiVersionSet,
db_prefix = DbKeyPrefix::CommonApiVersionCache
);
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct ClientConfigKey {
pub id: FederationId,
}
#[derive(Debug, Encodable)]
pub struct ClientConfigKeyPrefix;
impl_db_record!(
key = ClientConfigKey,
value = ClientConfig,
db_prefix = DbKeyPrefix::ClientConfig
);
impl_db_lookup!(key = ClientConfigKey, query_prefix = ClientConfigKeyPrefix);
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct ClientMetadataKey;
#[derive(Debug, Encodable)]
pub struct ClientMetadataPrefix;
impl_db_record!(
key = ClientMetadataKey,
value = Metadata,
db_prefix = DbKeyPrefix::ClientMetadata
);
impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct ClientInitStateKey;
#[derive(Debug, Encodable)]
pub struct ClientInitStatePrefix;
#[derive(Debug, Encodable, Decodable)]
pub enum InitMode {
Fresh,
Recover { snapshot: Option<ClientBackup> },
}
#[derive(Debug, Encodable, Decodable)]
pub enum InitModeComplete {
Fresh,
Recover,
}
#[derive(Debug, Encodable, Decodable)]
pub enum InitState {
Pending(InitMode),
Complete(InitModeComplete),
}
impl InitState {
pub fn into_complete(self) -> Self {
match self {
InitState::Pending(p) => InitState::Complete(match p {
InitMode::Fresh => InitModeComplete::Fresh,
InitMode::Recover { .. } => InitModeComplete::Recover,
}),
InitState::Complete(t) => InitState::Complete(t),
}
}
pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
match self {
InitState::Pending(p) => match p {
InitMode::Fresh => None,
InitMode::Recover { snapshot } => Some(snapshot.clone()),
},
InitState::Complete(_) => None,
}
}
pub fn is_pending(&self) -> bool {
match self {
InitState::Pending(_) => true,
InitState::Complete(_) => false,
}
}
}
impl_db_record!(
key = ClientInitStateKey,
value = InitState,
db_prefix = DbKeyPrefix::ClientInitState
);
impl_db_lookup!(
key = ClientInitStateKey,
query_prefix = ClientInitStatePrefix
);
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct ClientRecoverySnapshot;
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct ClientRecoverySnapshotPrefix;
impl_db_record!(
key = ClientRecoverySnapshot,
value = Option<ClientBackup>,
db_prefix = DbKeyPrefix::ClientInitState
);
impl_db_lookup!(
key = ClientRecoverySnapshot,
query_prefix = ClientRecoverySnapshotPrefix
);
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct ClientModuleRecovery {
pub module_instance_id: ModuleInstanceId,
}
#[derive(Debug, Encodable)]
pub struct ClientModuleRecoveryPrefix;
#[derive(Debug, Clone, Encodable, Decodable)]
pub struct ClientModuleRecoveryState {
pub progress: RecoveryProgress,
}
impl ClientModuleRecoveryState {
pub fn is_done(&self) -> bool {
self.progress.is_done()
}
}
impl_db_record!(
key = ClientModuleRecovery,
value = ClientModuleRecoveryState,
db_prefix = DbKeyPrefix::ClientInitState,
);
impl_db_lookup!(
key = ClientModuleRecovery,
query_prefix = ClientModuleRecoveryPrefix
);
#[derive(Debug, Encodable, Decodable)]
pub struct LastBackupKey;
impl_db_record!(
key = LastBackupKey,
value = ClientBackup,
db_prefix = DbKeyPrefix::ClientLastBackup
);
#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct MetaFieldKey(pub String);
#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct MetaFieldPrefix;
#[derive(Encodable, Decodable, Debug, Clone)]
pub struct MetaFieldValue(pub String);
#[derive(Encodable, Decodable, Debug)]
pub struct MetaServiceInfoKey;
#[derive(Encodable, Decodable, Debug)]
pub struct MetaServiceInfo {
pub last_updated: SystemTime,
pub revision: u64,
}
impl_db_record!(
key = MetaFieldKey,
value = MetaFieldValue,
db_prefix = DbKeyPrefix::ClientMetaField
);
impl_db_record!(
key = MetaServiceInfoKey,
value = MetaServiceInfo,
db_prefix = DbKeyPrefix::ClientMetaServiceInfo
);
impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
pub type ClientMigrationFn =
for<'r, 'tx> fn(
&'r mut DatabaseTransaction<'tx>,
ModuleInstanceId,
Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>, ModuleDecoderRegistry,
) -> BoxFuture<'r, anyhow::Result<Option<(Vec<DynState>, Vec<DynState>)>>>;
pub async fn apply_migrations_client(
db: &Database,
kind: String,
target_db_version: DatabaseVersion,
migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>,
module_instance_id: ModuleInstanceId,
decoders: ModuleDecoderRegistry,
) -> Result<(), anyhow::Error> {
{
let mut global_dbtx = db.begin_transaction().await;
migrate_database_version(
&mut global_dbtx.to_ref_nc(),
target_db_version,
Some(module_instance_id),
kind.clone(),
)
.await?;
global_dbtx.commit_tx_result().await?;
}
let mut global_dbtx = db.begin_transaction().await;
let disk_version = global_dbtx
.get_value(&DatabaseVersionKey(module_instance_id))
.await;
info!(
?disk_version,
?target_db_version,
module_instance_id,
kind,
"Migrating client module database"
);
let db_version = if let Some(disk_version) = disk_version {
let mut current_db_version = disk_version;
if current_db_version > target_db_version {
return Err(anyhow::anyhow!(format!(
"On disk database version for module {kind} was higher than the code database version."
)));
}
if current_db_version == target_db_version {
global_dbtx.ignore_uncommitted();
return Ok(());
}
let mut active_states =
get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
let mut inactive_states =
get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
while current_db_version < target_db_version {
let new_states = if let Some(migration) = migrations.get(¤t_db_version) {
info!(target: LOG_DB, "Migrating module {kind} current: {current_db_version} target: {target_db_version}");
migration(
&mut global_dbtx
.to_ref_with_prefix_module_id(module_instance_id)
.into_nc(),
module_instance_id,
active_states.clone(),
inactive_states.clone(),
decoders.clone(),
)
.await?
} else {
warn!("Missing client db migration for version {current_db_version}");
None
};
if let Some((new_active_states, new_inactive_states)) = new_states {
remove_old_and_persist_new_active_states(
&mut global_dbtx.to_ref_nc(),
new_active_states.clone(),
active_states.clone(),
module_instance_id,
)
.await;
remove_old_and_persist_new_inactive_states(
&mut global_dbtx.to_ref_nc(),
new_inactive_states.clone(),
inactive_states.clone(),
module_instance_id,
)
.await;
active_states = new_active_states
.into_iter()
.map(|state| (state.to_bytes(), state.operation_id()))
.collect::<Vec<_>>();
inactive_states = new_inactive_states
.into_iter()
.map(|state| (state.to_bytes(), state.operation_id()))
.collect::<Vec<_>>();
}
current_db_version.increment();
global_dbtx
.insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_db_version)
.await;
}
current_db_version
} else {
target_db_version
};
global_dbtx.commit_tx_result().await?;
info!(target: LOG_DB, "{} module db version: {} migration complete", kind, db_version);
Ok(())
}
pub async fn get_active_states(
dbtx: &mut DatabaseTransaction<'_>,
module_instance_id: ModuleInstanceId,
) -> Vec<(Vec<u8>, OperationId)> {
dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
.await
.filter_map(|(state, _)| async move {
if module_instance_id == state.module_instance_id {
Some((state.state, state.operation_id))
} else {
None
}
})
.collect::<Vec<_>>()
.await
}
pub async fn get_inactive_states(
dbtx: &mut DatabaseTransaction<'_>,
module_instance_id: ModuleInstanceId,
) -> Vec<(Vec<u8>, OperationId)> {
dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
.await
.filter_map(|(state, _)| async move {
if module_instance_id == state.module_instance_id {
Some((state.state, state.operation_id))
} else {
None
}
})
.collect::<Vec<_>>()
.await
}
pub async fn remove_old_and_persist_new_active_states(
dbtx: &mut DatabaseTransaction<'_>,
new_active_states: Vec<DynState>,
states_to_remove: Vec<(Vec<u8>, OperationId)>,
module_instance_id: ModuleInstanceId,
) {
for (bytes, operation_id) in states_to_remove {
dbtx.remove_entry(&ActiveStateKeyBytes {
operation_id,
module_instance_id,
state: bytes,
})
.await
.expect("Did not delete anything");
}
let new_active_states = new_active_states
.into_iter()
.map(|state| {
(
ActiveStateKey::from_state(state),
ActiveStateMeta::default(),
)
})
.collect::<Vec<_>>();
for (state, active_state) in new_active_states {
dbtx.insert_new_entry(&state, &active_state).await;
}
}
pub async fn remove_old_and_persist_new_inactive_states(
dbtx: &mut DatabaseTransaction<'_>,
new_inactive_states: Vec<DynState>,
states_to_remove: Vec<(Vec<u8>, OperationId)>,
module_instance_id: ModuleInstanceId,
) {
for (bytes, operation_id) in states_to_remove {
dbtx.remove_entry(&InactiveStateKeyBytes {
operation_id,
module_instance_id,
state: bytes,
})
.await
.expect("Did not delete anything");
}
let new_inactive_states = new_inactive_states
.into_iter()
.map(|state| {
(
InactiveStateKey::from_state(state),
InactiveStateMeta {
created_at: fedimint_core::time::now(),
exited_at: fedimint_core::time::now(),
},
)
})
.collect::<Vec<_>>();
for (state, inactive_state) in new_inactive_states {
dbtx.insert_new_entry(&state, &inactive_state).await;
}
}