use std::cmp::Ordering;
use std::collections::{BTreeMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::ops::{self, Range};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::{anyhow, bail, ensure, Context};
use async_stream::stream;
use db::{
CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientConfigKeyPrefix,
ClientInviteCodeKey, ClientInviteCodeKeyPrefix, EncodedClientSecretKey,
};
use fedimint_core::api::{
ApiVersionSet, DynGlobalApi, DynModuleApi, GlobalFederationApi, IGlobalFederationApi,
InviteCode, WsFederationApi,
};
use fedimint_core::config::{
ClientConfig, ClientModuleConfig, FederationId, JsonClientConfig, JsonWithKind,
ModuleInitRegistry,
};
use fedimint_core::core::{
DynInput, DynOutput, IInput, IOutput, ModuleInstanceId, ModuleKind, OperationId,
};
use fedimint_core::db::{
AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, IRawDatabase,
};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::module::{
ApiVersion, MultiApiVersion, SupportedApiVersionsSummary, SupportedCoreApiVersions,
SupportedModuleApiVersions,
};
use fedimint_core::task::{sleep, MaybeSend, MaybeSync, TaskGroup};
use fedimint_core::transaction::Transaction;
use fedimint_core::util::{BoxStream, NextOrPending};
use fedimint_core::{
apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send_sync, Amount, OutPoint,
TransactionId,
};
pub use fedimint_derive_secret as derivable_secret;
use fedimint_derive_secret::DerivableSecret;
use futures::StreamExt;
use module::{DynClientModule, FinalClient};
use rand::thread_rng;
use secp256k1_zkp::{PublicKey, Secp256k1};
use secret::DeriveableSecretClientExt;
use thiserror::Error;
#[cfg(not(target_family = "wasm"))]
use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor};
use tracing::{debug, error, info, warn};
use crate::backup::Metadata;
use crate::db::OperationLogKey;
use crate::module::init::{
ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit,
};
use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator};
use crate::oplog::OperationLog;
use crate::sm::executor::{
ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix,
};
use crate::sm::{
ClientSMDatabaseTransaction, DynState, Executor, GlobalContext, IState, Notifier,
OperationState, State,
};
use crate::transaction::{
tx_submission_sm_decoder, ClientInput, ClientOutput, TransactionBuilder,
TransactionBuilderBalance, TxSubmissionContext, TxSubmissionStates,
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
};
pub mod backup;
pub mod db;
pub mod module;
pub mod oplog;
pub mod secret;
pub mod sm;
pub mod transaction;
pub type InstancelessDynClientInput = ClientInput<
Box<maybe_add_send_sync!(dyn IInput + 'static)>,
Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext> + 'static)>,
>;
pub type InstancelessDynClientOutput = ClientOutput<
Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext> + 'static)>,
>;
#[derive(Debug, Error)]
pub enum AddStateMachinesError {
#[error("State already exists in database")]
StateAlreadyExists,
#[error("Got {0}")]
Other(#[from] anyhow::Error),
}
pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
#[apply(async_trait_maybe_send!)]
pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
fn module_api(&self) -> DynModuleApi;
fn client_config(&self) -> &ClientConfig;
fn api(&self) -> &DynGlobalApi;
fn decoders(&self) -> &ModuleDecoderRegistry;
async fn claim_input_dyn(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
input: InstancelessDynClientInput,
) -> (TransactionId, Vec<OutPoint>);
async fn fund_output_dyn(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
output: InstancelessDynClientOutput,
) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>;
async fn add_state_machine_dyn(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
sm: Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext>)>,
) -> AddStateMachinesResult;
async fn transaction_update_stream(
&self,
operation_id: OperationId,
) -> BoxStream<OperationState<TxSubmissionStates>>;
}
dyn_newtype_define! {
#[derive(Clone)]
pub DynGlobalClientContext(Arc<IGlobalClientContext>)
}
impl DynGlobalClientContext {
pub async fn await_tx_accepted(
&self,
operation_id: OperationId,
query_txid: TransactionId,
) -> Result<(), String> {
self.transaction_update_stream(operation_id)
.await
.filter_map(|tx_update| {
std::future::ready(match tx_update.state {
TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
Some(Err(submit_error))
}
_ => None,
})
})
.next_or_pending()
.await
}
pub async fn claim_input<I, S>(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
input: ClientInput<I, S>,
) -> (TransactionId, Vec<OutPoint>)
where
I: IInput + MaybeSend + MaybeSync + 'static,
S: IState<DynGlobalClientContext> + MaybeSend + MaybeSync + 'static,
{
self.claim_input_dyn(
dbtx,
InstancelessDynClientInput {
input: Box::new(input.input),
keys: input.keys,
state_machines: states_to_instanceless_dyn(input.state_machines),
},
)
.await
}
pub async fn fund_output<O, S>(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
output: ClientOutput<O, S>,
) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
where
O: IOutput + MaybeSend + MaybeSync + 'static,
S: IState<DynGlobalClientContext> + MaybeSend + MaybeSync + 'static,
{
self.fund_output_dyn(
dbtx,
InstancelessDynClientOutput {
output: Box::new(output.output),
state_machines: states_to_instanceless_dyn(output.state_machines),
},
)
.await
}
pub async fn add_state_machine<S>(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
sm: S,
) -> AddStateMachinesResult
where
S: State<GlobalContext = DynGlobalClientContext> + MaybeSend + MaybeSync + 'static,
{
self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
}
}
fn states_to_instanceless_dyn<
S: IState<DynGlobalClientContext> + MaybeSend + MaybeSync + 'static,
>(
state_gen: StateGenerator<S>,
) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext> + 'static)>> {
Arc::new(move |txid, out_idx| {
let states: Vec<S> = state_gen(txid, out_idx);
states
.into_iter()
.map(|state| box_up_state(state))
.collect()
})
}
fn box_up_state(
state: impl IState<DynGlobalClientContext> + 'static,
) -> Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext> + 'static)> {
Box::new(state)
}
impl<T> From<Arc<T>> for DynGlobalClientContext
where
T: IGlobalClientContext,
{
fn from(inner: Arc<T>) -> Self {
DynGlobalClientContext { inner }
}
}
impl GlobalContext for DynGlobalClientContext {}
impl Debug for Client {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Client")
}
}
#[derive(Clone, Debug)]
struct ModuleGlobalClientContext {
client: Arc<Client>,
module_instance_id: ModuleInstanceId,
operation: OperationId,
}
#[apply(async_trait_maybe_send!)]
impl IGlobalClientContext for ModuleGlobalClientContext {
fn module_api(&self) -> DynModuleApi {
self.api().with_module(self.module_instance_id)
}
fn api(&self) -> &DynGlobalApi {
&self.client.api
}
fn decoders(&self) -> &ModuleDecoderRegistry {
self.client.decoders()
}
fn client_config(&self) -> &ClientConfig {
self.client.config()
}
async fn claim_input_dyn(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
input: InstancelessDynClientInput,
) -> (TransactionId, Vec<OutPoint>) {
let instance_input = ClientInput {
input: DynInput::from_parts(self.module_instance_id, input.input),
keys: input.keys,
state_machines: states_add_instance(self.module_instance_id, input.state_machines),
};
self.client
.finalize_and_submit_transaction_inner(
&mut dbtx.global_tx().to_ref_nc(),
self.operation,
TransactionBuilder::new().with_input(instance_input),
)
.await
.expect("Can only fail if additional funding is needed")
}
async fn fund_output_dyn(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
output: InstancelessDynClientOutput,
) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
let instance_output = ClientOutput {
output: DynOutput::from_parts(self.module_instance_id, output.output),
state_machines: states_add_instance(self.module_instance_id, output.state_machines),
};
self.client
.finalize_and_submit_transaction_inner(
&mut dbtx.global_tx().to_ref_nc(),
self.operation,
TransactionBuilder::new().with_output(instance_output),
)
.await
}
async fn add_state_machine_dyn(
&self,
dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
sm: Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext>)>,
) -> AddStateMachinesResult {
let state = DynState::from_parts(self.module_instance_id, sm);
self.client
.executor
.add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state])
.await
}
async fn transaction_update_stream(
&self,
operation_id: OperationId,
) -> BoxStream<OperationState<TxSubmissionStates>> {
self.client.transaction_update_stream(operation_id).await
}
}
fn states_add_instance(
module_instance_id: ModuleInstanceId,
state_gen: StateGenerator<
Box<maybe_add_send_sync!(dyn IState<DynGlobalClientContext> + 'static)>,
>,
) -> StateGenerator<DynState<DynGlobalClientContext>> {
Arc::new(move |txid, out_idx| {
let states = state_gen(txid, out_idx);
Iterator::collect(
states
.into_iter()
.map(|state| DynState::from_parts(module_instance_id, state)),
)
})
}
#[derive(Debug)]
pub struct ClientArc {
inner: Arc<Client>,
__use_constructor_to_create: (),
}
impl ClientArc {
fn new(inner: Arc<Client>) -> Self {
inner
.client_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
inner,
__use_constructor_to_create: (),
}
}
}
impl ops::Deref for ClientArc {
type Target = Arc<Client>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ClientArc {
pub fn downgrade(&self) -> ClientWeak {
ClientWeak {
inner: Arc::downgrade(&self.inner),
}
}
}
impl Clone for ClientArc {
fn clone(&self) -> Self {
ClientArc::new(self.inner.clone())
}
}
#[derive(Debug, Clone)]
pub struct ClientWeak {
inner: Weak<Client>,
}
impl ClientWeak {
pub fn upgrade(&self) -> Option<ClientArc> {
Weak::upgrade(&self.inner).map(ClientArc::new)
}
}
impl Drop for ClientArc {
fn drop(&mut self) {
let client_count = self
.inner
.client_count
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
if client_count == 1 {
info!("Last client reference dropped, shutting down client task group");
let maybe_shutdown_confirmation = self.inner.executor.stop_executor();
#[cfg(not(target_family = "wasm"))]
{
if RuntimeHandle::current().runtime_flavor() == RuntimeFlavor::CurrentThread {
return;
}
let Some(shutdown_confirmation) = maybe_shutdown_confirmation else {
return;
};
tokio::task::block_in_place(move || {
futures::executor::block_on(async {
if shutdown_confirmation.await.is_err() {
error!("Error while awaiting client shutdown confirmation");
}
});
});
}
}
}
}
const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
&[ApiVersion { major: 0, minor: 0 }];
pub type ModuleGlobalContextGen = ContextGen<DynGlobalClientContext>;
pub struct ClientModuleInstance<'m, M: ClientModule> {
pub id: ModuleInstanceId,
pub db: Database,
pub api: DynModuleApi,
module: &'m M,
}
impl<'m, M> ops::Deref for ClientModuleInstance<'m, M>
where
M: ClientModule,
{
type Target = M;
fn deref(&self) -> &Self::Target {
self.module
}
}
pub struct Client {
config: ClientConfig,
decoders: ModuleDecoderRegistry,
db: Database,
federation_id: FederationId,
federation_meta: BTreeMap<String, String>,
primary_module_instance: ModuleInstanceId,
modules: ClientModuleRegistry,
module_inits: ClientModuleInitRegistry,
executor: Executor<DynGlobalClientContext>,
api: DynGlobalApi,
root_secret: DerivableSecret,
operation_log: OperationLog,
secp_ctx: Secp256k1<secp256k1_zkp::All>,
client_count: AtomicUsize,
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
self.api.as_ref()
}
pub fn api_clone(&self) -> DynGlobalApi {
self.api.clone()
}
pub async fn start_executor(self: &Arc<Self>) {
debug!(
"Starting fedimint client executor (version: {})",
env!("FEDIMINT_BUILD_CODE_VERSION")
);
self.executor.start_executor(self.context_gen()).await;
}
pub fn federation_id(&self) -> FederationId {
self.federation_id
}
fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
let client_inner = Arc::downgrade(self);
Arc::new(move |module_instance, operation| {
ModuleGlobalClientContext {
client: client_inner
.clone()
.upgrade()
.expect("ModuleGlobalContextGen called after client was dropped"),
module_instance_id: module_instance,
operation,
}
.into()
})
}
fn config(&self) -> &ClientConfig {
&self.config
}
pub fn decoders(&self) -> &ModuleDecoderRegistry {
&self.decoders
}
fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
self.try_get_module(instance)
.expect("Module instance not found")
}
fn try_get_module(
&self,
instance: ModuleInstanceId,
) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
Some(self.modules.get(instance)?.as_ref())
}
fn transaction_builder_balance(
&self,
builder: &TransactionBuilder,
) -> TransactionBuilderBalance {
let mut in_amount = Amount::ZERO;
let mut out_amount = Amount::ZERO;
let mut fee_amount = Amount::ZERO;
for input in &builder.inputs {
let module = self.get_module(input.input.module_instance_id());
let item_amount = module.input_amount(&input.input).expect(
"We only build transactions with input versions that are supported by the module",
);
in_amount += item_amount.amount;
fee_amount += item_amount.fee;
}
for output in &builder.outputs {
let module = self.get_module(output.output.module_instance_id());
let item_amount = module.output_amount(&output.output).expect(
"We only build transactions with output versions that are supported by the module",
);
out_amount += item_amount.amount;
fee_amount += item_amount.fee;
}
let total_out_amount = out_amount + fee_amount;
match total_out_amount.cmp(&in_amount) {
Ordering::Equal => TransactionBuilderBalance::Balanced,
Ordering::Less => TransactionBuilderBalance::Overfunded(in_amount - total_out_amount),
Ordering::Greater => {
TransactionBuilderBalance::Underfunded(total_out_amount - in_amount)
}
}
}
pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
}
pub fn get_meta(&self, key: &str) -> Option<String> {
self.federation_meta.get(key).cloned()
}
fn root_secret(&self) -> DerivableSecret {
self.root_secret.clone()
}
pub async fn add_state_machines(
&self,
dbtx: &mut DatabaseTransaction<'_>,
states: Vec<DynState<DynGlobalClientContext>>,
) -> AddStateMachinesResult {
self.executor.add_state_machines_dbtx(dbtx, states).await
}
pub async fn get_active_operations(&self) -> HashSet<OperationId> {
let active_states = self.executor.get_active_states().await;
let mut active_operations = HashSet::with_capacity(active_states.len());
let mut dbtx = self.db().begin_transaction_nc().await;
for (state, _) in active_states {
let operation_id = state.operation_id();
if dbtx
.get_value(&OperationLogKey { operation_id })
.await
.is_some()
{
active_operations.insert(operation_id);
}
}
active_operations
}
pub fn operation_log(&self) -> &OperationLog {
&self.operation_log
}
async fn finalize_transaction(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
mut partial_transaction: TransactionBuilder,
) -> anyhow::Result<(
Transaction,
Vec<DynState<DynGlobalClientContext>>,
Range<u64>,
)> {
if let TransactionBuilderBalance::Underfunded(missing_amount) =
self.transaction_builder_balance(&partial_transaction)
{
let inputs = self
.primary_module()
.create_sufficient_input(
self.primary_module_instance,
dbtx,
operation_id,
missing_amount,
)
.await?;
partial_transaction.inputs.extend(inputs);
}
let mut change_range = Range {
start: partial_transaction.outputs.len() as u64,
end: partial_transaction.outputs.len() as u64,
};
if let TransactionBuilderBalance::Overfunded(excess_amount) =
self.transaction_builder_balance(&partial_transaction)
{
let change_outputs = self
.primary_module()
.create_exact_output(
self.primary_module_instance,
dbtx,
operation_id,
excess_amount,
)
.await;
change_range.end += change_outputs.len() as u64;
partial_transaction.outputs.extend(change_outputs);
}
assert!(
matches!(
self.transaction_builder_balance(&partial_transaction),
TransactionBuilderBalance::Balanced
),
"Transaction is balanced after the previous two operations"
);
let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
Ok((tx, states, change_range))
}
pub async fn finalize_and_submit_transaction<F, M>(
&self,
operation_id: OperationId,
operation_type: &str,
operation_meta: F,
tx_builder: TransactionBuilder,
) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
where
F: Fn(TransactionId, Vec<OutPoint>) -> M + Clone + MaybeSend + MaybeSync,
M: serde::Serialize + MaybeSend,
{
let operation_type = operation_type.to_owned();
let autocommit_res = self
.db
.autocommit(
|dbtx, _| {
let operation_type = operation_type.clone();
let tx_builder = tx_builder.clone();
let operation_meta = operation_meta.clone();
Box::pin(async move {
if Client::operation_exists(dbtx, operation_id).await {
bail!("There already exists an operation with id {operation_id:?}")
}
let (txid, change) = self
.finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
.await?;
self.operation_log()
.add_operation_log_entry(
dbtx,
operation_id,
&operation_type,
operation_meta(txid, change.clone()),
)
.await;
Ok((txid, change))
})
},
Some(100), )
.await;
match autocommit_res {
Ok(txid) => Ok(txid),
Err(AutocommitError::ClosureError { error, .. }) => Err(error),
Err(AutocommitError::CommitFailed {
attempts,
last_error,
}) => panic!(
"Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
),
}
}
async fn finalize_and_submit_transaction_inner(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
tx_builder: TransactionBuilder,
) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
let (transaction, mut states, change_range) = self
.finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
.await?;
let txid = transaction.tx_hash();
let change_outpoints = change_range
.into_iter()
.map(|out_idx| OutPoint { txid, out_idx })
.collect();
let tx_submission_sm = DynState::from_typed(
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
OperationState {
operation_id,
state: TxSubmissionStates::Created(transaction),
},
);
states.push(tx_submission_sm);
self.executor.add_state_machines_dbtx(dbtx, states).await?;
Ok((txid, change_outpoints))
}
async fn transaction_update_stream(
&self,
operation_id: OperationId,
) -> BoxStream<'static, OperationState<TxSubmissionStates>> {
self.executor
.notifier()
.module_notifier::<OperationState<TxSubmissionStates>>(
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
)
.subscribe(operation_id)
.await
}
async fn operation_exists(
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
) -> bool {
let active_state_exists = dbtx
.find_by_prefix(&ActiveOperationStateKeyPrefix::<DynGlobalClientContext> {
operation_id,
_pd: Default::default(),
})
.await
.next()
.await
.is_some();
let inactive_state_exists = dbtx
.find_by_prefix(&InactiveOperationStateKeyPrefix::<DynGlobalClientContext> {
operation_id,
_pd: Default::default(),
})
.await
.next()
.await
.is_some();
active_state_exists || inactive_state_exists
}
pub async fn await_primary_module_output(
&self,
operation_id: OperationId,
out_point: OutPoint,
) -> anyhow::Result<Amount> {
self.primary_module()
.await_primary_module_output(operation_id, out_point)
.await
}
pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
let all_active_states = self.executor.get_active_states().await;
all_active_states
.into_iter()
.any(|context| context.0.operation_id() == operation_id)
}
pub fn get_first_module<M: ClientModule>(&self) -> ClientModuleInstance<M> {
let module_kind = M::kind();
let id = self
.get_first_instance(&module_kind)
.unwrap_or_else(|| panic!("No modules found of kind {module_kind}"));
let module: &M = self
.try_get_module(id)
.unwrap_or_else(|| panic!("Unknown module instance {id}"))
.as_any()
.downcast_ref::<M>()
.unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()));
ClientModuleInstance {
id,
db: self.db().with_prefix_module_id(id),
api: self.api().with_module(id),
module,
}
}
pub fn get_module_client_dyn(
&self,
instance_id: ModuleInstanceId,
) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
self.try_get_module(instance_id)
.ok_or(anyhow!("Unknown module instance {}", instance_id))
}
pub fn db(&self) -> &Database {
&self.db
}
pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
TransactionUpdates {
update_stream: self.transaction_update_stream(operation_id).await,
}
}
pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
if &self
.modules
.get_with_kind(self.primary_module_instance)
.expect("must have primary module")
.0
== module_kind
{
return Some(self.primary_module_instance);
}
self.modules
.iter_modules()
.find(|(_, kind, _module)| *kind == module_kind)
.map(|(instance_id, _, _)| instance_id)
}
pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
get_decoded_client_secret::<T>(self.db()).await
}
pub async fn await_primary_module_outputs(
&self,
operation_id: OperationId,
outputs: Vec<OutPoint>,
) -> anyhow::Result<Amount> {
let mut amount = Amount::ZERO;
for out_point in outputs {
amount += self
.await_primary_module_output(operation_id, out_point)
.await?;
}
Ok(amount)
}
pub fn get_config(&self) -> &ClientConfig {
&self.config
}
pub fn get_config_json(&self) -> JsonClientConfig {
JsonClientConfig {
global: self.get_config().global.clone(),
modules: self
.get_config()
.modules
.iter()
.map(|(instance_id, ClientModuleConfig { kind, config, .. })| {
(
*instance_id,
JsonWithKind::new(
kind.clone(),
config.clone().expect_decoded().to_json().into(),
),
)
})
.collect(),
}
}
pub fn primary_module(&self) -> &DynClientModule {
self.modules
.get(self.primary_module_instance)
.expect("primary module must be present")
}
pub async fn get_balance(&self) -> Amount {
self.primary_module()
.get_balance(
self.primary_module_instance,
&mut self.db().begin_transaction_nc().await,
)
.await
}
pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
let initial_balance = self.get_balance().await;
let db = self.db().clone();
let primary_module = self.primary_module().clone();
let primary_module_instance = self.primary_module_instance;
Box::pin(stream! {
yield initial_balance;
let mut prev_balance = initial_balance;
while let Some(()) = balance_changes.next().await {
let mut dbtx = db.begin_transaction_nc().await;
let balance = primary_module
.get_balance(primary_module_instance, &mut dbtx)
.await;
if balance != prev_balance {
prev_balance = balance;
yield balance;
}
}
})
}
pub async fn discover_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
Ok(self
.api()
.discover_api_version_set(
&Self::supported_api_versions_summary_static(self.get_config(), &self.module_inits)
.await,
)
.await?)
}
pub async fn discover_common_api_version_static(
config: &ClientConfig,
client_module_init: &ClientModuleInitRegistry,
api: &DynGlobalApi,
) -> anyhow::Result<ApiVersionSet> {
Ok(api
.discover_api_version_set(
&Self::supported_api_versions_summary_static(config, client_module_init).await,
)
.await?)
}
pub async fn supported_api_versions_summary_static(
config: &ClientConfig,
client_module_init: &ClientModuleInitRegistry,
) -> SupportedApiVersionsSummary {
SupportedApiVersionsSummary {
core: SupportedCoreApiVersions {
core_consensus: config.global.consensus_version,
api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
.expect("must not have conflicting versions"),
},
modules: config
.modules
.iter()
.filter_map(|(&module_instance_id, module_config)| {
client_module_init
.get(module_config.kind())
.map(|module_init| {
(
module_instance_id,
SupportedModuleApiVersions {
core_consensus: config.global.consensus_version,
module_consensus: module_config.version,
api: module_init.supported_api_versions(),
},
)
})
})
.collect(),
}
}
async fn load_and_refresh_common_api_version_static(
config: &ClientConfig,
module_inits: &ModuleInitRegistry<DynClientModuleInit>,
api: &DynGlobalApi,
db: &Database,
) -> anyhow::Result<ApiVersionSet> {
if let Some(v) = db
.begin_transaction()
.await
.get_value(&CachedApiVersionSetKey)
.await
{
debug!("Found existing cached common api versions");
let config = config.clone();
let module_inits = module_inits.clone();
let api = api.clone();
let db = db.clone();
TaskGroup::new()
.spawn("refresh_common_api_version_static", |_| async move {
if let Err(e) =
Self::refresh_common_api_version_static(&config, &module_inits, &api, &db)
.await
{
warn!("Failed to discover common api versions: {e}");
}
})
.await;
return Ok(v.0);
}
debug!("No existing cached common api versions found, waiting for initial discovery");
Self::refresh_common_api_version_static(config, module_inits, api, db).await
}
async fn refresh_common_api_version_static(
config: &ClientConfig,
module_inits: &ModuleInitRegistry<DynClientModuleInit>,
api: &DynGlobalApi,
db: &Database,
) -> anyhow::Result<ApiVersionSet> {
debug!("Refreshing common api versions");
let common_api_versions =
Client::discover_common_api_version_static(config, module_inits, api).await?;
debug!("Updating the cached common api versions");
let mut dbtx = db.begin_transaction().await;
let _ = dbtx
.insert_entry(
&CachedApiVersionSetKey,
&CachedApiVersionSet(common_api_versions.clone()),
)
.await;
dbtx.commit_tx().await;
Ok(common_api_versions)
}
}
pub struct TransactionUpdates {
update_stream: BoxStream<'static, OperationState<TxSubmissionStates>>,
}
impl TransactionUpdates {
pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
self.update_stream
.filter_map(|tx_update| {
std::future::ready(match tx_update.state {
TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
Some(Err(submit_error))
}
_ => None,
})
})
.next_or_pending()
.await
}
}
#[derive(Debug, Clone)]
pub struct FederationInfo {
config: ClientConfig,
invite_code: Option<InviteCode>,
}
impl FederationInfo {
pub async fn from_invite_code(invite: InviteCode) -> anyhow::Result<FederationInfo> {
let config = try_download_config(invite.clone(), 10).await?;
Ok(FederationInfo {
config,
invite_code: Some(invite),
})
}
pub async fn from_config(config: ClientConfig) -> anyhow::Result<FederationInfo> {
Ok(FederationInfo {
config,
invite_code: None,
})
}
pub fn config(&self) -> &ClientConfig {
&self.config
}
pub fn invite_code(&self) -> Option<InviteCode> {
self.invite_code.clone()
}
pub fn meta<V: serde::de::DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<V>> {
let Some(str_value) = self.config.global.meta.get(key) else {
return Ok(None);
};
serde_json::from_str(str_value).context(format!("Decoding meta field '{key}' failed"))
}
pub fn api(&self) -> DynGlobalApi {
DynGlobalApi::from(WsFederationApi::from_config(&self.config))
}
pub fn federation_id(&self) -> FederationId {
self.config.global.federation_id()
}
}
#[derive(Default)]
pub struct ClientBuilder {
module_inits: ClientModuleInitRegistry,
primary_module_instance: Option<ModuleInstanceId>,
config: Option<FederationInfo>,
db: Option<DatabaseSource>,
}
pub enum DatabaseSource {
Fresh(Database),
Reuse(ClientArc),
}
impl ClientBuilder {
pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
self.module_inits = module_inits;
}
pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
self.module_inits.attach(module_init);
}
pub fn with_federation_info(&mut self, federation_info: FederationInfo) {
let was_replaced = self.config.replace(federation_info).is_some();
assert!(
!was_replaced,
"Only one configuration source can be given to the builder."
)
}
pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
let was_replaced = self
.primary_module_instance
.replace(primary_module_instance)
.is_some();
assert!(
!was_replaced,
"Only one primary module can be given to the builder."
)
}
pub fn with_raw_database<D: IRawDatabase + 'static>(&mut self, db: D) {
self.with_database(db.into());
}
pub fn with_database(&mut self, db: Database) {
let was_replaced = self.db.replace(DatabaseSource::Fresh(db)).is_some();
assert!(
!was_replaced,
"Only one database can be given to the builder."
);
}
pub fn with_old_client_database(&mut self, client: ClientArc) {
let was_replaced = self.db.replace(DatabaseSource::Reuse(client)).is_some();
assert!(
!was_replaced,
"Only one database can be given to the builder."
);
}
pub async fn store_encodable_client_secret<T: Encodable>(
&mut self,
secret: T,
) -> anyhow::Result<()> {
let mut dbtx = match self.db.as_ref().ok_or(anyhow!("No database provided"))? {
DatabaseSource::Fresh(db) => db.begin_transaction().await,
DatabaseSource::Reuse(client) => client.db().begin_transaction().await,
};
match dbtx.get_value(&EncodedClientSecretKey).await {
Some(_) => bail!("Encoded client secret already exists, cannot overwrite"),
None => {
let encoded_secret = T::consensus_encode_to_vec(&secret)?;
dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
.await;
dbtx.commit_tx().await;
Ok(())
}
}
}
pub async fn load_decodable_client_secret<T: Decodable>(&mut self) -> anyhow::Result<T> {
let mut dbtx = match self.db.as_ref().ok_or(anyhow!("No database provided"))? {
DatabaseSource::Fresh(db) => db.begin_transaction().await,
DatabaseSource::Reuse(client) => client.db().begin_transaction().await,
};
let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
dbtx.commit_tx().await;
match client_secret {
Some(client_secret) => {
T::consensus_decode(&mut client_secret.as_slice(), &Default::default())
.map_err(|e| anyhow!("Decoding failed: {e}"))
}
None => bail!("Encoded client secret not present in DB"),
}
}
pub async fn get_federation_id(&self) -> anyhow::Result<FederationId> {
if let Some(db) = self.db.as_ref() {
let db = match db {
DatabaseSource::Fresh(db) => db,
DatabaseSource::Reuse(client) => &client.inner.db,
};
if let Some(config) = get_config_from_db(db).await {
return Ok(config.global.federation_id());
}
}
if let Some(federation_info) = &self.config {
return Ok(federation_info.federation_id());
}
bail!("No config source present");
}
pub async fn build_restoring_from_backup(
self,
root_secret: DerivableSecret,
) -> anyhow::Result<(ClientArc, Metadata)> {
let client = self.build(root_secret).await?;
let backup = client.download_backup_from_federation().await?;
let metadata = client.restore_from_backup(backup).await?;
Ok((client, metadata))
}
pub async fn build(self, root_secret: DerivableSecret) -> anyhow::Result<ClientArc> {
let client = self.build_stopped(root_secret).await?;
client.start_executor().await;
Ok(client)
}
pub async fn build_stopped(self, root_secret: DerivableSecret) -> anyhow::Result<ClientArc> {
let (config, decoders, db) = match self.db.ok_or(anyhow!("No database was provided"))? {
DatabaseSource::Fresh(db) => {
let config = get_config(&db, self.config.clone()).await?;
let mut decoders = client_decoders(
&self.module_inits,
config
.modules
.iter()
.map(|(module_instance, module_config)| {
(*module_instance, module_config.kind())
}),
)?;
decoders.register_module(
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
ModuleKind::from_static_str("tx_submission"),
tx_submission_sm_decoder(),
);
let db = db.with_decoders(decoders.clone());
(config, decoders, db)
}
DatabaseSource::Reuse(client) => {
let db = client.inner.db.clone();
let decoders = client.inner.decoders.clone();
let config = get_config(&db, self.config.clone()).await?;
(config, decoders, db)
}
};
let config = config.redecode_raw(&decoders)?;
let primary_module_instance = self
.primary_module_instance
.ok_or(anyhow!("No primary module instance id was provided"))?;
let notifier = Notifier::new(db.clone());
let api = DynGlobalApi::from(WsFederationApi::from_config(&config));
let common_api_versions = Client::load_and_refresh_common_api_version_static(
&config,
&self.module_inits,
&api,
&db,
)
.await?;
let final_client = FinalClient::default();
let root_secret = root_secret.federation_key(&config.global.federation_id());
let modules = {
let mut modules = ClientModuleRegistry::default();
for (module_instance, module_config) in config.modules.clone() {
let kind = module_config.kind().clone();
let Some(module_init) = self.module_inits.get(&kind) else {
warn!("Module kind {kind} of instance {module_instance} not found in module gens, skipping");
continue;
};
let Some(&api_version) = common_api_versions.modules.get(&module_instance) else {
warn!("Module kind {kind} of instance {module_instance} has not compatible api version, skipping");
continue;
};
let module = module_init
.init(
final_client.clone(),
config.global.federation_id(),
module_config,
db.clone(),
module_instance,
api_version,
root_secret.derive_module_secret(module_instance),
notifier.clone(),
api.clone(),
)
.await?;
if primary_module_instance == module_instance && !module.supports_being_primary() {
bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module");
}
modules.register_module(module_instance, kind, module);
}
modules
};
let executor = {
let mut executor_builder = Executor::<DynGlobalClientContext>::builder();
executor_builder
.with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
for (module_instance_id, _, module) in modules.iter_modules() {
executor_builder.with_module_dyn(module.context(module_instance_id));
}
executor_builder.build(db.clone(), notifier).await
};
let client_inner = Arc::new(Client {
config: config.clone(),
decoders,
db: db.clone(),
federation_id: config.global.federation_id(),
federation_meta: config.global.meta,
primary_module_instance,
modules,
module_inits: self.module_inits.clone(),
executor,
api,
secp_ctx: Secp256k1::new(),
root_secret,
operation_log: OperationLog::new(db),
client_count: Default::default(),
});
let client_arc = ClientArc::new(client_inner);
final_client.set(client_arc.downgrade());
Ok(client_arc)
}
}
async fn get_config(
db: &Database,
maybe_federation_info: Option<FederationInfo>,
) -> anyhow::Result<ClientConfig> {
if let Some(config) = get_config_from_db(db).await {
ensure!(
maybe_federation_info.is_none(),
"Alternative config source provided but config was found in DB"
);
return Ok(config);
}
let federation_info = maybe_federation_info.ok_or(anyhow!("No config source was provided"))?;
let mut dbtx = db.begin_transaction().await;
dbtx.insert_new_entry(
&ClientConfigKey {
id: federation_info.federation_id(),
},
&federation_info.config().clone(),
)
.await;
if let Some(invite_code) = federation_info.invite_code() {
dbtx.insert_new_entry(&ClientInviteCodeKey {}, &invite_code)
.await;
}
dbtx.commit_tx_result().await?;
Ok(federation_info.config().clone())
}
pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
let mut dbtx = db.begin_transaction().await;
#[allow(clippy::let_and_return)]
let config = dbtx
.find_by_prefix(&ClientConfigKeyPrefix)
.await
.next()
.await
.map(|(_, config)| config);
config
}
pub async fn get_invite_code_from_db(db: &Database) -> Option<InviteCode> {
let mut dbtx = db.begin_transaction().await;
#[allow(clippy::let_and_return)]
let invite = dbtx
.find_by_prefix(&ClientInviteCodeKeyPrefix)
.await
.next()
.await
.map(|(_, invite)| invite);
invite
}
async fn try_download_config(
invite_code: InviteCode,
max_retries: usize,
) -> anyhow::Result<ClientConfig> {
let api = Arc::new(WsFederationApi::from_invite_code(&[invite_code.clone()]))
as Arc<dyn IGlobalFederationApi + Send + Sync + 'static>;
let mut num_retries = 0;
let wait_millis = 500;
loop {
if num_retries > max_retries {
break Err(anyhow!("Failed to download client config"));
}
match api.download_client_config(&invite_code).await {
Ok(cfg) => {
break Ok(cfg);
}
Err(e) => {
debug!("Failed to download client config {:?}", e);
sleep(Duration::from_millis(wait_millis)).await;
}
}
num_retries += 1;
}
}
pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
let mut tx = db.begin_transaction().await;
let client_secret = tx.get_value(&EncodedClientSecretKey).await;
tx.commit_tx().await;
match client_secret {
Some(client_secret) => {
T::consensus_decode(&mut client_secret.as_slice(), &Default::default())
.map_err(|e| anyhow!("Decoding failed: {e}"))
}
None => bail!("Encoded client secret not present in DB"),
}
}
pub fn client_decoders<'a>(
registry: &ModuleInitRegistry<DynClientModuleInit>,
module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
) -> anyhow::Result<ModuleDecoderRegistry> {
let mut modules = BTreeMap::new();
for (id, kind) in module_kinds {
let Some(init) = registry.get(kind) else {
info!("Detected configuration for unsupported module id: {id}, kind: {kind}");
continue;
};
modules.insert(
id,
(
kind.clone(),
IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
),
);
}
Ok(ModuleDecoderRegistry::from(modules))
}