fedimint_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::explicit_deref_methods)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::return_self_not_must_use)]
10#![allow(clippy::too_many_lines)]
11#![allow(clippy::type_complexity)]
12
13//! # Client library for fedimintd
14//!
15//! This library provides a client interface to build module clients that can be
16//! plugged together into a fedimint client that exposes a high-level interface
17//! for application authors to integrate with.
18//!
19//! ## Module Clients
20//! Module clients have to at least implement the [`module::ClientModule`] trait
21//! and a factory struct implementing [`module::init::ClientModuleInit`]. The
22//! `ClientModule` trait defines the module types (tx inputs, outputs, etc.) as
23//! well as the module's [state machines](sm::State).
24//!
25//! ### State machines
26//! State machines are spawned when starting operations and drive them
27//! forward in the background. All module state machines are run by a central
28//! [`sm::Executor`]. This means typically starting an operation shall return
29//! instantly.
30//!
31//! For example when doing a deposit the function starting it would immediately
32//! return a deposit address and a [`OperationId`] (important concept, highly
33//! recommended to read the docs) while spawning a state machine checking the
34//! blockchain for incoming bitcoin transactions. The progress of these state
35//! machines can then be *observed* using the operation id, but no further user
36//! interaction is required to drive them forward.
37//!
38//! ### State Machine Contexts
39//! State machines have access to both a [global
40//! context](`DynGlobalClientContext`) as well as to a [module-specific
41//! context](module::ClientModule::context).
42//!
43//! The global context provides access to the federation API and allows to claim
44//! module outputs (and transferring the value into the client's wallet), which
45//! can be used for refunds.
46//!
47//! The client-specific context can be used for other purposes, such as
48//! supplying config to the state transitions or giving access to other APIs
49//! (e.g. LN gateway in case of the lightning module).
50//!
51//! ### Extension traits
52//! The modules themselves can only create inputs and outputs that then have to
53//! be combined into transactions by the user and submitted via
54//! [`Client::finalize_and_submit_transaction`]. To make this easier most module
55//! client implementations contain an extension trait which is implemented for
56//! [`Client`] and allows to create the most typical fedimint transactions with
57//! a single function call.
58//!
59//! To observe the progress each high level operation function should be
60//! accompanied by one returning a stream of high-level operation updates.
61//! Internally that stream queries the state machines belonging to the
62//! operation to determine the high-level operation state.
63//!
64//! ### Primary Modules
65//! Not all modules have the ability to hold money for long. E.g. the lightning
66//! module and its smart contracts are only used to incentivize LN payments, not
67//! to hold money. The mint module on the other hand holds e-cash note and can
68//! thus be used to fund transactions and to absorb change. Module clients with
69//! this ability should implement [`ClientModule::supports_being_primary`] and
70//! related methods.
71//!
72//! For a example of a client module see [the mint client](https://github.com/fedimint/fedimint/blob/master/modules/fedimint-mint-client/src/lib.rs).
73//!
74//! ## Client
75//! The [`Client`] struct is the main entry point for application authors. It is
76//! constructed using its builder which can be obtained via [`Client::builder`].
77//! The supported module clients have to be chosen at compile time while the
78//! actually available ones will be determined by the config loaded at runtime.
79//!
80//! For a hacky instantiation of a complete client see the [`ng` subcommand of `fedimint-cli`](https://github.com/fedimint/fedimint/blob/55f9d88e17d914b92a7018de677d16e57ed42bf6/fedimint-cli/src/ng.rs#L56-L73).
81
82use std::collections::{BTreeMap, HashSet};
83use std::fmt::{Debug, Formatter};
84use std::future::pending;
85use std::ops::{self, Range};
86use std::pin::Pin;
87use std::sync::{Arc, Weak};
88use std::time::Duration;
89
90use anyhow::{anyhow, bail, ensure, format_err, Context};
91use api::ClientRawFederationApiExt as _;
92use async_stream::{stream, try_stream};
93use backup::ClientBackup;
94use bitcoin::secp256k1;
95use db::event_log::{
96    self, run_event_log_ordering_task, DBTransactionEventLogExt, Event, EventKind, EventLogEntry,
97    EventLogId,
98};
99use db::{
100    apply_migrations_client, apply_migrations_core_client, get_core_client_database_migrations,
101    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientInitStateKey,
102    ClientModuleRecovery, ClientPreRootSecretHashKey, EncodedClientSecretKey, InitMode,
103    PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey,
104};
105use fedimint_api_client::api::net::Connector;
106use fedimint_api_client::api::{
107    ApiVersionSet, DynGlobalApi, DynModuleApi, FederationApiExt, GlobalFederationApiWithCacheExt,
108    IGlobalFederationApi, WsFederationApi,
109};
110use fedimint_core::config::{
111    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
112};
113use fedimint_core::core::{
114    DynInput, DynOutput, IInput, IOutput, IntoDynInstance as _, ModuleInstanceId, ModuleKind,
115    OperationId,
116};
117use fedimint_core::db::{
118    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
119    IDatabaseTransactionOpsCoreTyped, NonCommittable,
120};
121use fedimint_core::encoding::{Decodable, Encodable};
122use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
123use fedimint_core::invite_code::InviteCode;
124use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
125use fedimint_core::module::{
126    ApiAuth, ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
127    SupportedCoreApiVersions, SupportedModuleApiVersions,
128};
129use fedimint_core::net::api_announcement::SignedApiAnnouncement;
130use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
131use fedimint_core::transaction::Transaction;
132use fedimint_core::util::{backoff_util, retry, BoxStream, NextOrPending, SafeUrl};
133use fedimint_core::{
134    apply, async_trait_maybe_send, dyn_newtype_define, fedimint_build_code_version_env,
135    maybe_add_send, maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId,
136    TransactionId,
137};
138pub use fedimint_derive_secret as derivable_secret;
139use fedimint_derive_secret::DerivableSecret;
140use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
141use futures::stream::FuturesUnordered;
142use futures::{Future, Stream, StreamExt};
143use meta::{LegacyMetaSource, MetaService};
144use module::recovery::RecoveryProgress;
145use module::{DynClientModule, FinalClient};
146use rand::thread_rng;
147use secp256k1::{PublicKey, Secp256k1};
148use secret::{DeriveableSecretClientExt, PlainRootSecretStrategy, RootSecretStrategy as _};
149use serde::{Deserialize, Serialize};
150use thiserror::Error;
151#[cfg(not(target_family = "wasm"))]
152use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor};
153use tokio::sync::{broadcast, watch, RwLock};
154use tokio_stream::wrappers::WatchStream;
155use tracing::{debug, error, info, trace, warn};
156use transaction::{
157    ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputSM, TxSubmissionStatesSM,
158};
159
160use crate::api_announcements::{get_api_urls, run_api_announcement_sync, ApiAnnouncementPrefix};
161use crate::api_version_discovery::discover_common_api_versions_set;
162use crate::backup::Metadata;
163use crate::db::{ClientMetadataKey, ClientModuleRecoveryState, InitState, OperationLogKey};
164use crate::module::init::{
165    ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit,
166};
167use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator};
168use crate::oplog::OperationLog;
169use crate::sm::executor::{
170    ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix,
171};
172use crate::sm::{ClientSMDatabaseTransaction, DynState, Executor, IState, Notifier, State};
173use crate::transaction::{
174    tx_submission_sm_decoder, ClientInput, ClientOutputBundle, TransactionBuilder,
175    TxSubmissionContext, TxSubmissionStates, TRANSACTION_SUBMISSION_MODULE_INSTANCE,
176};
177
178pub mod api;
179
180/// Client backup
181pub mod backup;
182/// Database keys used by the client
183pub mod db;
184/// Environment variables
185pub mod envs;
186/// Module client interface definitions
187pub mod module;
188/// Operation log subsystem of the client
189pub mod oplog;
190/// Secret handling & derivation
191pub mod secret;
192/// Client state machine interfaces and executor implementation
193pub mod sm;
194/// Structs and interfaces to construct Fedimint transactions
195pub mod transaction;
196
197mod api_version_discovery;
198
199pub mod api_announcements;
200/// Management of meta fields
201pub mod meta;
202
203#[derive(Serialize, Deserialize)]
204pub struct TxCreatedEvent {
205    txid: TransactionId,
206    operation_id: OperationId,
207}
208
209impl Event for TxCreatedEvent {
210    const MODULE: Option<ModuleKind> = None;
211
212    const KIND: EventKind = EventKind::from_static("tx-created");
213}
214
215#[derive(Serialize, Deserialize)]
216pub struct TxAcceptedEvent {
217    txid: TransactionId,
218    operation_id: OperationId,
219}
220
221impl Event for TxAcceptedEvent {
222    const MODULE: Option<ModuleKind> = None;
223
224    const KIND: EventKind = EventKind::from_static("tx-accepted");
225}
226
227#[derive(Serialize, Deserialize)]
228pub struct TxRejectedEvent {
229    txid: TransactionId,
230    error: String,
231    operation_id: OperationId,
232}
233impl Event for TxRejectedEvent {
234    const MODULE: Option<ModuleKind> = None;
235
236    const KIND: EventKind = EventKind::from_static("tx-rejected");
237}
238
239#[derive(Serialize, Deserialize)]
240pub struct ModuleRecoveryStarted {
241    module_id: ModuleInstanceId,
242}
243
244impl Event for ModuleRecoveryStarted {
245    const MODULE: Option<ModuleKind> = None;
246
247    const KIND: EventKind = EventKind::from_static("module-recovery-started");
248}
249
250#[derive(Serialize, Deserialize)]
251pub struct ModuleRecoveryCompleted {
252    module_id: ModuleInstanceId,
253}
254
255impl Event for ModuleRecoveryCompleted {
256    const MODULE: Option<ModuleKind> = None;
257
258    const KIND: EventKind = EventKind::from_static("module-recovery-completed");
259}
260
261pub type InstancelessDynClientInput = ClientInput<Box<maybe_add_send_sync!(dyn IInput + 'static)>>;
262
263pub type InstancelessDynClientInputSM =
264    ClientInputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
265
266pub type InstancelessDynClientInputBundle = ClientInputBundle<
267    Box<maybe_add_send_sync!(dyn IInput + 'static)>,
268    Box<maybe_add_send_sync!(dyn IState + 'static)>,
269>;
270
271pub type InstancelessDynClientOutput =
272    ClientOutput<Box<maybe_add_send_sync!(dyn IOutput + 'static)>>;
273
274pub type InstancelessDynClientOutputSM =
275    ClientOutputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
276pub type InstancelessDynClientOutputBundle = ClientOutputBundle<
277    Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
278    Box<maybe_add_send_sync!(dyn IState + 'static)>,
279>;
280
281#[derive(Debug, Error)]
282pub enum AddStateMachinesError {
283    #[error("State already exists in database")]
284    StateAlreadyExists,
285    #[error("Got {0}")]
286    Other(#[from] anyhow::Error),
287}
288
289pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
290
291#[apply(async_trait_maybe_send!)]
292pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
293    /// Returned a reference client's module API client, so that module-specific
294    /// calls can be made
295    fn module_api(&self) -> DynModuleApi;
296
297    async fn client_config(&self) -> ClientConfig;
298
299    /// Returns a reference to the client's federation API client. The provided
300    /// interface [`IGlobalFederationApi`] typically does not provide the
301    /// necessary functionality, for this extension traits like
302    /// [`fedimint_api_client::api::IGlobalFederationApi`] have to be used.
303    // TODO: Could be removed in favor of client() except for testing
304    fn api(&self) -> &DynGlobalApi;
305
306    fn decoders(&self) -> &ModuleDecoderRegistry;
307
308    /// This function is mostly meant for internal use, you are probably looking
309    /// for [`DynGlobalClientContext::claim_inputs`].
310    /// Returns transaction id of the funding transaction and an optional
311    /// `OutPoint` that represents change if change was added.
312    async fn claim_inputs_dyn(
313        &self,
314        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
315        inputs: InstancelessDynClientInputBundle,
316    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>;
317
318    /// This function is mostly meant for internal use, you are probably looking
319    /// for [`DynGlobalClientContext::fund_output`].
320    /// Returns transaction id of the funding transaction and an optional
321    /// `OutPoint` that represents change if change was added.
322    async fn fund_output_dyn(
323        &self,
324        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
325        outputs: InstancelessDynClientOutputBundle,
326    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>;
327
328    /// Adds a state machine to the executor.
329    async fn add_state_machine_dyn(
330        &self,
331        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
332        sm: Box<maybe_add_send_sync!(dyn IState)>,
333    ) -> AddStateMachinesResult;
334
335    async fn log_event_json(
336        &self,
337        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
338        kind: EventKind,
339        module: Option<(ModuleKind, ModuleInstanceId)>,
340        payload: serde_json::Value,
341        transient: bool,
342    );
343
344    async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM>;
345}
346
347#[apply(async_trait_maybe_send!)]
348impl IGlobalClientContext for () {
349    fn module_api(&self) -> DynModuleApi {
350        unimplemented!("fake implementation, only for tests");
351    }
352
353    async fn client_config(&self) -> ClientConfig {
354        unimplemented!("fake implementation, only for tests");
355    }
356
357    fn api(&self) -> &DynGlobalApi {
358        unimplemented!("fake implementation, only for tests");
359    }
360
361    fn decoders(&self) -> &ModuleDecoderRegistry {
362        unimplemented!("fake implementation, only for tests");
363    }
364
365    async fn claim_inputs_dyn(
366        &self,
367        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
368        _input: InstancelessDynClientInputBundle,
369    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
370        unimplemented!("fake implementation, only for tests");
371    }
372
373    async fn fund_output_dyn(
374        &self,
375        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
376        _outputs: InstancelessDynClientOutputBundle,
377    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
378        unimplemented!("fake implementation, only for tests");
379    }
380
381    async fn add_state_machine_dyn(
382        &self,
383        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
384        _sm: Box<maybe_add_send_sync!(dyn IState)>,
385    ) -> AddStateMachinesResult {
386        unimplemented!("fake implementation, only for tests");
387    }
388
389    async fn log_event_json(
390        &self,
391        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
392        _kind: EventKind,
393        _module: Option<(ModuleKind, ModuleInstanceId)>,
394        _payload: serde_json::Value,
395        _transient: bool,
396    ) {
397        unimplemented!("fake implementation, only for tests");
398    }
399
400    async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
401        unimplemented!("fake implementation, only for tests");
402    }
403}
404
405dyn_newtype_define! {
406    /// Global state and functionality provided to all state machines running in the
407    /// client
408    #[derive(Clone)]
409    pub DynGlobalClientContext(Arc<IGlobalClientContext>)
410}
411
412impl DynGlobalClientContext {
413    pub fn new_fake() -> Self {
414        DynGlobalClientContext::from(())
415    }
416
417    pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> {
418        self.transaction_update_stream()
419            .await
420            .filter_map(|tx_update| {
421                std::future::ready(match tx_update.state {
422                    TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
423                    TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
424                        Some(Err(submit_error))
425                    }
426                    _ => None,
427                })
428            })
429            .next_or_pending()
430            .await
431    }
432
433    pub async fn claim_inputs<I, S>(
434        &self,
435        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
436        inputs: ClientInputBundle<I, S>,
437    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
438    where
439        I: IInput + MaybeSend + MaybeSync + 'static,
440        S: IState + MaybeSend + MaybeSync + 'static,
441    {
442        self.claim_inputs_dyn(dbtx, inputs.into_instanceless())
443            .await
444    }
445
446    /// Creates a transaction with the supplied output and funding added by the
447    /// primary module if possible. If the primary module does not have the
448    /// required funds this function fails.
449    ///
450    /// The transactions submission state machine as well as the state machines
451    /// for the funding inputs are generated automatically. The caller is
452    /// responsible for the output's state machines, should there be any
453    /// required.
454    pub async fn fund_output<O, S>(
455        &self,
456        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
457        outputs: ClientOutputBundle<O, S>,
458    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
459    where
460        O: IOutput + MaybeSend + MaybeSync + 'static,
461        S: IState + MaybeSend + MaybeSync + 'static,
462    {
463        self.fund_output_dyn(dbtx, outputs.into_instanceless())
464            .await
465    }
466
467    /// Allows adding state machines from inside a transition to the executor.
468    /// The added state machine belongs to the same module instance as the state
469    /// machine from inside which it was spawned.
470    pub async fn add_state_machine<S>(
471        &self,
472        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
473        sm: S,
474    ) -> AddStateMachinesResult
475    where
476        S: State + MaybeSend + MaybeSync + 'static,
477    {
478        self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
479    }
480
481    async fn log_event<E>(&self, dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, event: E)
482    where
483        E: Event + Send,
484    {
485        self.log_event_json(
486            dbtx,
487            E::KIND,
488            E::MODULE.map(|m| (m, dbtx.module_id())),
489            serde_json::to_value(&event).expect("Payload serialization can't fail"),
490            <E as Event>::PERSIST,
491        )
492        .await;
493    }
494}
495
496fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>(
497    state_gen: StateGenerator<S>,
498) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> {
499    Arc::new(move |out_point_range| {
500        let states: Vec<S> = state_gen(out_point_range);
501        states
502            .into_iter()
503            .map(|state| box_up_state(state))
504            .collect()
505    })
506}
507
508/// Not sure why I couldn't just directly call `Box::new` ins
509/// [`states_to_instanceless_dyn`], but this fixed it.
510fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> {
511    Box::new(state)
512}
513
514impl<T> From<Arc<T>> for DynGlobalClientContext
515where
516    T: IGlobalClientContext,
517{
518    fn from(inner: Arc<T>) -> Self {
519        DynGlobalClientContext { inner }
520    }
521}
522
523// TODO: impl `Debug` for `Client` and derive here
524impl Debug for Client {
525    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
526        write!(f, "Client")
527    }
528}
529
530/// Global state given to a specific client module and state. It is aware inside
531/// which module instance and operation it is used and to avoid module being
532/// aware of their instance id etc.
533#[derive(Clone, Debug)]
534struct ModuleGlobalClientContext {
535    client: Arc<Client>,
536    module_instance_id: ModuleInstanceId,
537    operation: OperationId,
538}
539
540#[apply(async_trait_maybe_send!)]
541impl IGlobalClientContext for ModuleGlobalClientContext {
542    fn module_api(&self) -> DynModuleApi {
543        self.api().with_module(self.module_instance_id)
544    }
545
546    fn api(&self) -> &DynGlobalApi {
547        &self.client.api
548    }
549
550    fn decoders(&self) -> &ModuleDecoderRegistry {
551        self.client.decoders()
552    }
553
554    async fn client_config(&self) -> ClientConfig {
555        self.client.config().await
556    }
557
558    async fn claim_inputs_dyn(
559        &self,
560        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
561        inputs: InstancelessDynClientInputBundle,
562    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
563        let tx_builder =
564            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
565
566        self.client
567            .finalize_and_submit_transaction_inner(
568                &mut dbtx.global_tx().to_ref_nc(),
569                self.operation,
570                tx_builder,
571            )
572            .await
573    }
574
575    async fn fund_output_dyn(
576        &self,
577        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
578        outputs: InstancelessDynClientOutputBundle,
579    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
580        let tx_builder =
581            TransactionBuilder::new().with_outputs(outputs.into_dyn(self.module_instance_id));
582
583        self.client
584            .finalize_and_submit_transaction_inner(
585                &mut dbtx.global_tx().to_ref_nc(),
586                self.operation,
587                tx_builder,
588            )
589            .await
590    }
591
592    async fn add_state_machine_dyn(
593        &self,
594        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
595        sm: Box<maybe_add_send_sync!(dyn IState)>,
596    ) -> AddStateMachinesResult {
597        let state = DynState::from_parts(self.module_instance_id, sm);
598
599        self.client
600            .executor
601            .add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state])
602            .await
603    }
604
605    async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
606        self.client.transaction_update_stream(self.operation).await
607    }
608
609    async fn log_event_json(
610        &self,
611        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
612        kind: EventKind,
613        module: Option<(ModuleKind, ModuleInstanceId)>,
614        payload: serde_json::Value,
615        transient: bool,
616    ) {
617        self.client
618            .log_event_raw_dbtx(
619                dbtx.global_tx(),
620                kind,
621                module,
622                serde_json::to_vec(&payload).expect("Serialization can't fail"),
623                transient,
624            )
625            .await;
626    }
627}
628
629fn states_add_instance(
630    module_instance_id: ModuleInstanceId,
631    state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>,
632) -> StateGenerator<DynState> {
633    Arc::new(move |out_point_range| {
634        let states = state_gen(out_point_range);
635        Iterator::collect(
636            states
637                .into_iter()
638                .map(|state| DynState::from_parts(module_instance_id, state)),
639        )
640    })
641}
642
643/// User handle to the [`Client`] instance
644///
645/// On the drop of [`ClientHandle`] the client will be shut-down, and resources
646/// it used freed.
647///
648/// Notably it [`ops::Deref`]s to the [`Client`] where most
649/// methods live.
650///
651/// Put this in an Arc to clone it (see [`ClientHandleArc`]).
652#[derive(Debug)]
653pub struct ClientHandle {
654    inner: Option<Arc<Client>>,
655}
656
657/// An alias for a reference counted [`ClientHandle`]
658pub type ClientHandleArc = Arc<ClientHandle>;
659
660impl ClientHandle {
661    /// Create
662    fn new(inner: Arc<Client>) -> Self {
663        ClientHandle {
664            inner: inner.into(),
665        }
666    }
667
668    fn as_inner(&self) -> &Arc<Client> {
669        self.inner.as_ref().expect("Inner always set")
670    }
671
672    pub fn start_executor(&self) {
673        self.as_inner().start_executor();
674    }
675
676    /// Shutdown the client.
677    pub async fn shutdown(mut self) {
678        self.shutdown_inner().await;
679    }
680
681    async fn shutdown_inner(&mut self) {
682        let Some(inner) = self.inner.take() else {
683            error!("ClientHandleShared::shutdown called twice");
684            return;
685        };
686        inner.executor.stop_executor();
687        let db = inner.db.clone();
688        debug!(target: LOG_CLIENT, "Waiting for client task group to shut down");
689        if let Err(err) = inner
690            .task_group
691            .clone()
692            .shutdown_join_all(Some(Duration::from_secs(30)))
693            .await
694        {
695            warn!(target: LOG_CLIENT, %err, "Error waiting for client task group to shut down");
696        }
697
698        let client_strong_count = Arc::strong_count(&inner);
699        debug!(target: LOG_CLIENT, "Dropping last handle to Client");
700        // We are sure that no background tasks are running in the client anymore, so we
701        // can drop the (usually) last inner reference.
702        drop(inner);
703
704        if client_strong_count != 1 {
705            debug!(target: LOG_CLIENT, count = client_strong_count - 1, LOG_CLIENT, "External Client references remaining after last handle dropped");
706        }
707
708        let db_strong_count = db.strong_count();
709        if db_strong_count != 1 {
710            debug!(target: LOG_CLIENT, count = db_strong_count - 1, "External DB references remaining after last handle dropped");
711        }
712        trace!(target: LOG_CLIENT, "Dropped last handle to Client");
713    }
714
715    /// Restart the client
716    ///
717    /// Returns false if there are other clones of [`ClientHandle`], or starting
718    /// the client again failed for some reason.
719    ///
720    /// Notably it will re-use the original [`Database`] handle, and not attempt
721    /// to open it again.
722    pub async fn restart(self) -> anyhow::Result<ClientHandle> {
723        let (builder, config, api_secret, root_secret) = {
724            let client = self
725                .inner
726                .as_ref()
727                .ok_or_else(|| format_err!("Already stopped"))?;
728            let builder = ClientBuilder::from_existing(client);
729            let config = client.config().await;
730            let api_secret = client.api_secret.clone();
731            let root_secret = client.root_secret.clone();
732
733            (builder, config, api_secret, root_secret)
734        };
735        self.shutdown().await;
736
737        builder.build(root_secret, config, api_secret, false).await
738    }
739}
740
741impl ops::Deref for ClientHandle {
742    type Target = Client;
743
744    fn deref(&self) -> &Self::Target {
745        self.inner.as_ref().expect("Must have inner client set")
746    }
747}
748
749impl ClientHandle {
750    pub(crate) fn downgrade(&self) -> ClientWeak {
751        ClientWeak {
752            inner: Arc::downgrade(self.inner.as_ref().expect("Inner always set")),
753        }
754    }
755}
756
757/// Internal self-reference to [`Client`]
758#[derive(Debug, Clone)]
759pub(crate) struct ClientStrong {
760    inner: Arc<Client>,
761}
762
763impl ops::Deref for ClientStrong {
764    type Target = Client;
765
766    fn deref(&self) -> &Self::Target {
767        self.inner.deref()
768    }
769}
770
771/// Like [`ClientStrong`] but using a [`Weak`] handle to [`Client`]
772///
773/// This is not meant to be used by external code.
774#[derive(Debug, Clone)]
775pub(crate) struct ClientWeak {
776    inner: Weak<Client>,
777}
778
779impl ClientWeak {
780    pub fn upgrade(&self) -> Option<ClientStrong> {
781        Weak::upgrade(&self.inner).map(|inner| ClientStrong { inner })
782    }
783}
784
785/// We need a separate drop implementation for `Client` that triggers
786/// `Executor::stop_executor` even though the `Drop` implementation of
787/// `ExecutorInner` should already take care of that. The reason is that as long
788/// as the executor task is active there may be a cycle in the
789/// `Arc<Client>`s such that at least one `Executor` never gets dropped.
790impl Drop for ClientHandle {
791    fn drop(&mut self) {
792        if self.inner.is_none() {
793            return;
794        }
795
796        // We can't use block_on in single-threaded mode or wasm
797        #[cfg(target_family = "wasm")]
798        let can_block = false;
799        #[cfg(not(target_family = "wasm"))]
800        // nosemgrep: ban-raw-block-on
801        let can_block = RuntimeHandle::current().runtime_flavor() != RuntimeFlavor::CurrentThread;
802        if !can_block {
803            let inner = self.inner.take().expect("Must have inner client set");
804            inner.executor.stop_executor();
805            if cfg!(target_family = "wasm") {
806                error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on wasm, call ClientHandle::shutdown manually.");
807            } else {
808                error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on current thread runtime, call ClientHandle::shutdown manually.");
809            }
810            return;
811        }
812
813        debug!(target: LOG_CLIENT, "Shutting down the Client on last handle drop");
814        #[cfg(not(target_family = "wasm"))]
815        runtime::block_in_place(|| {
816            runtime::block_on(self.shutdown_inner());
817        });
818    }
819}
820
821/// List of core api versions supported by the implementation.
822/// Notably `major` version is the one being supported, and corresponding
823/// `minor` version is the one required (for given `major` version).
824const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
825    &[ApiVersion { major: 0, minor: 0 }];
826
827pub type ModuleGlobalContextGen = ContextGen;
828
829/// Resources particular to a module instance
830pub struct ClientModuleInstance<'m, M: ClientModule> {
831    /// Instance id of the module
832    pub id: ModuleInstanceId,
833    /// Module-specific DB
834    pub db: Database,
835    /// Module-specific API
836    pub api: DynModuleApi,
837
838    module: &'m M,
839}
840
841impl<'m, M: ClientModule> ClientModuleInstance<'m, M> {
842    /// Get a reference to the module
843    pub fn inner(&self) -> &'m M {
844        self.module
845    }
846}
847
848impl<'m, M> ops::Deref for ClientModuleInstance<'m, M>
849where
850    M: ClientModule,
851{
852    type Target = M;
853
854    fn deref(&self) -> &Self::Target {
855        self.module
856    }
857}
858
859/// Main client type
860///
861/// A handle and API to interacting with a single federation. End user
862/// applications that want to support interacting with multiple federations at
863/// the same time, will need to instantiate and manage multiple instances of
864/// this struct.
865///
866/// Under the hood it is starting and managing service tasks, state machines,
867/// database and other resources required.
868///
869/// This type is shared externally and internally, and
870/// [`ClientHandle`] is responsible for external lifecycle management
871/// and resource freeing of the [`Client`].
872pub struct Client {
873    config: RwLock<ClientConfig>,
874    api_secret: Option<String>,
875    decoders: ModuleDecoderRegistry,
876    db: Database,
877    federation_id: FederationId,
878    federation_meta: BTreeMap<String, String>,
879    primary_module_instance: ModuleInstanceId,
880    modules: ClientModuleRegistry,
881    module_inits: ClientModuleInitRegistry,
882    executor: Executor,
883    api: DynGlobalApi,
884    root_secret: DerivableSecret,
885    operation_log: OperationLog,
886    secp_ctx: Secp256k1<secp256k1::All>,
887    meta_service: Arc<MetaService>,
888    connector: Connector,
889
890    task_group: TaskGroup,
891
892    /// Updates about client recovery progress
893    client_recovery_progress_receiver:
894        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
895
896    /// Internal client sender to wake up log ordering task every time a
897    /// (unuordered) log event is added.
898    log_ordering_wakeup_tx: watch::Sender<()>,
899    /// Receiver for events fired every time (ordered) log event is added.
900    log_event_added_rx: watch::Receiver<()>,
901    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
902}
903
904impl Client {
905    /// Initialize a client builder that can be configured to create a new
906    /// client.
907    pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
908        apply_migrations_core_client(
909            &db,
910            "fedimint-client".to_string(),
911            get_core_client_database_migrations(),
912        )
913        .await?;
914        Ok(ClientBuilder::new(db))
915    }
916
917    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
918        self.api.as_ref()
919    }
920
921    pub fn api_clone(&self) -> DynGlobalApi {
922        self.api.clone()
923    }
924
925    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
926    pub fn task_group(&self) -> &TaskGroup {
927        &self.task_group
928    }
929
930    /// Useful for our CLI tooling, not meant for external use
931    #[doc(hidden)]
932    pub fn executor(&self) -> &Executor {
933        &self.executor
934    }
935
936    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
937        let mut dbtx = db.begin_transaction_nc().await;
938        dbtx.get_value(&ClientConfigKey).await
939    }
940
941    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
942        let mut dbtx = db.begin_transaction_nc().await;
943        dbtx.get_value(&ApiSecretKey).await
944    }
945
946    pub async fn store_encodable_client_secret<T: Encodable>(
947        db: &Database,
948        secret: T,
949    ) -> anyhow::Result<()> {
950        let mut dbtx = db.begin_transaction().await;
951
952        // Don't overwrite an existing secret
953        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
954            bail!("Encoded client secret already exists, cannot overwrite")
955        }
956
957        let encoded_secret = T::consensus_encode_to_vec(&secret);
958        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
959            .await;
960        dbtx.commit_tx().await;
961        Ok(())
962    }
963
964    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
965        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
966            bail!("Encoded client secret not present in DB")
967        };
968
969        Ok(secret)
970    }
971    pub async fn load_decodable_client_secret_opt<T: Decodable>(
972        db: &Database,
973    ) -> anyhow::Result<Option<T>> {
974        let mut dbtx = db.begin_transaction_nc().await;
975
976        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
977
978        Ok(match client_secret {
979            Some(client_secret) => Some(
980                T::consensus_decode(&mut client_secret.as_slice(), &ModuleRegistry::default())
981                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
982            ),
983            None => None,
984        })
985    }
986
987    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
988        let client_secret =
989            if let Ok(secret) = Self::load_decodable_client_secret::<[u8; 64]>(db).await {
990                secret
991            } else {
992                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
993                Self::store_encodable_client_secret(db, secret)
994                    .await
995                    .expect("Storing client secret must work");
996                secret
997            };
998        Ok(client_secret)
999    }
1000
1001    pub async fn is_initialized(db: &Database) -> bool {
1002        Self::get_config_from_db(db).await.is_some()
1003    }
1004
1005    pub fn start_executor(self: &Arc<Self>) {
1006        debug!(
1007            "Starting fedimint client executor (version: {})",
1008            fedimint_build_code_version_env!()
1009        );
1010        self.executor.start_executor(self.context_gen());
1011    }
1012
1013    pub fn federation_id(&self) -> FederationId {
1014        self.federation_id
1015    }
1016
1017    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
1018        let client_inner = Arc::downgrade(self);
1019        Arc::new(move |module_instance, operation| {
1020            ModuleGlobalClientContext {
1021                client: client_inner
1022                    .clone()
1023                    .upgrade()
1024                    .expect("ModuleGlobalContextGen called after client was dropped"),
1025                module_instance_id: module_instance,
1026                operation,
1027            }
1028            .into()
1029        })
1030    }
1031
1032    pub async fn config(&self) -> ClientConfig {
1033        self.config.read().await.clone()
1034    }
1035
1036    pub fn api_secret(&self) -> &Option<String> {
1037        &self.api_secret
1038    }
1039
1040    pub fn decoders(&self) -> &ModuleDecoderRegistry {
1041        &self.decoders
1042    }
1043
1044    /// Returns a reference to the module, panics if not found
1045    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1046        self.try_get_module(instance)
1047            .expect("Module instance not found")
1048    }
1049
1050    fn try_get_module(
1051        &self,
1052        instance: ModuleInstanceId,
1053    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
1054        Some(self.modules.get(instance)?.as_ref())
1055    }
1056
1057    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
1058        self.modules.get(instance).is_some()
1059    }
1060
1061    /// Returns the input amount and output amount of a transaction
1062    ///
1063    /// # Panics
1064    /// If any of the input or output versions in the transaction builder are
1065    /// unknown by the respective module.
1066    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
1067        // FIXME: prevent overflows, currently not suitable for untrusted input
1068        let mut in_amount = Amount::ZERO;
1069        let mut out_amount = Amount::ZERO;
1070        let mut fee_amount = Amount::ZERO;
1071
1072        for input in builder.inputs() {
1073            let module = self.get_module(input.input.module_instance_id());
1074
1075            let item_fee = module.input_fee(input.amount, &input.input).expect(
1076                "We only build transactions with input versions that are supported by the module",
1077            );
1078
1079            in_amount += input.amount;
1080            fee_amount += item_fee;
1081        }
1082
1083        for output in builder.outputs() {
1084            let module = self.get_module(output.output.module_instance_id());
1085
1086            let item_fee = module.output_fee(output.amount, &output.output).expect(
1087                "We only build transactions with output versions that are supported by the module",
1088            );
1089
1090            out_amount += output.amount;
1091            fee_amount += item_fee;
1092        }
1093
1094        (in_amount, out_amount + fee_amount)
1095    }
1096
1097    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1098        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
1099    }
1100
1101    pub fn get_meta(&self, key: &str) -> Option<String> {
1102        self.federation_meta.get(key).cloned()
1103    }
1104
1105    fn root_secret(&self) -> DerivableSecret {
1106        self.root_secret.clone()
1107    }
1108
1109    pub async fn add_state_machines(
1110        &self,
1111        dbtx: &mut DatabaseTransaction<'_>,
1112        states: Vec<DynState>,
1113    ) -> AddStateMachinesResult {
1114        self.executor.add_state_machines_dbtx(dbtx, states).await
1115    }
1116
1117    // TODO: implement as part of [`OperationLog`]
1118    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
1119        let active_states = self.executor.get_active_states().await;
1120        let mut active_operations = HashSet::with_capacity(active_states.len());
1121        let mut dbtx = self.db().begin_transaction_nc().await;
1122        for (state, _) in active_states {
1123            let operation_id = state.operation_id();
1124            if dbtx
1125                .get_value(&OperationLogKey { operation_id })
1126                .await
1127                .is_some()
1128            {
1129                active_operations.insert(operation_id);
1130            }
1131        }
1132        active_operations
1133    }
1134
1135    pub fn operation_log(&self) -> &OperationLog {
1136        &self.operation_log
1137    }
1138
1139    /// Get the meta manager to read meta fields.
1140    pub fn meta_service(&self) -> &Arc<MetaService> {
1141        &self.meta_service
1142    }
1143
1144    /// Adds funding to a transaction or removes over-funding via change.
1145    async fn finalize_transaction(
1146        &self,
1147        dbtx: &mut DatabaseTransaction<'_>,
1148        operation_id: OperationId,
1149        mut partial_transaction: TransactionBuilder,
1150    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
1151        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1152
1153        let (added_input_bundle, change_outputs) = self
1154            .primary_module()
1155            .create_final_inputs_and_outputs(
1156                self.primary_module_instance,
1157                dbtx,
1158                operation_id,
1159                input_amount,
1160                output_amount,
1161            )
1162            .await?;
1163
1164        // This is the range of  outputs that will be added to the transaction
1165        // in order to balance it. Notice that it may stay empty in case the transaction
1166        // is already balanced.
1167        let change_range = Range {
1168            start: partial_transaction.outputs().count() as u64,
1169            end: (partial_transaction.outputs().count() + change_outputs.outputs.len()) as u64,
1170        };
1171
1172        partial_transaction = partial_transaction
1173            .with_inputs(added_input_bundle)
1174            .with_outputs(change_outputs);
1175
1176        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1177
1178        assert!(input_amount >= output_amount, "Transaction is underfunded");
1179
1180        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
1181
1182        Ok((tx, states, change_range))
1183    }
1184
1185    /// Add funding and/or change to the transaction builder as needed, finalize
1186    /// the transaction and submit it to the federation.
1187    ///
1188    /// ## Errors
1189    /// The function will return an error if the operation with given ID already
1190    /// exists.
1191    ///
1192    /// ## Panics
1193    /// The function will panic if the database transaction collides with
1194    /// other and fails with others too often, this should not happen except for
1195    /// excessively concurrent scenarios.
1196    pub async fn finalize_and_submit_transaction<F, M>(
1197        &self,
1198        operation_id: OperationId,
1199        operation_type: &str,
1200        operation_meta: F,
1201        tx_builder: TransactionBuilder,
1202    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
1203    where
1204        F: Fn(TransactionId, Vec<OutPoint>) -> M + Clone + MaybeSend + MaybeSync,
1205        M: serde::Serialize + MaybeSend,
1206    {
1207        let operation_type = operation_type.to_owned();
1208
1209        let autocommit_res = self
1210            .db
1211            .autocommit(
1212                |dbtx, _| {
1213                    let operation_type = operation_type.clone();
1214                    let tx_builder = tx_builder.clone();
1215                    let operation_meta = operation_meta.clone();
1216                    Box::pin(async move {
1217                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
1218                            bail!("There already exists an operation with id {operation_id:?}")
1219                        }
1220
1221                        let (txid, change) = self
1222                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
1223                            .await?;
1224
1225                        self.operation_log()
1226                            .add_operation_log_entry(
1227                                dbtx,
1228                                operation_id,
1229                                &operation_type,
1230                                operation_meta(txid, change.clone()),
1231                            )
1232                            .await;
1233
1234                        Ok((txid, change))
1235                    })
1236                },
1237                Some(100), // TODO: handle what happens after 100 retries
1238            )
1239            .await;
1240
1241        match autocommit_res {
1242            Ok(txid) => Ok(txid),
1243            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
1244            Err(AutocommitError::CommitFailed {
1245                attempts,
1246                last_error,
1247            }) => panic!(
1248                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
1249            ),
1250        }
1251    }
1252
1253    async fn finalize_and_submit_transaction_inner(
1254        &self,
1255        dbtx: &mut DatabaseTransaction<'_>,
1256        operation_id: OperationId,
1257        tx_builder: TransactionBuilder,
1258    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
1259        let (transaction, mut states, change_range) = self
1260            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
1261            .await?;
1262
1263        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
1264            let inputs = transaction
1265                .inputs
1266                .iter()
1267                .map(DynInput::module_instance_id)
1268                .collect::<Vec<_>>();
1269            let outputs = transaction
1270                .outputs
1271                .iter()
1272                .map(DynOutput::module_instance_id)
1273                .collect::<Vec<_>>();
1274            warn!(
1275                target: LOG_CLIENT_NET_API,
1276                size=%transaction.consensus_encode_to_vec().len(),
1277                ?inputs,
1278                ?outputs,
1279                "Transaction too large",
1280            );
1281            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
1282            bail!(
1283                "The generated transaction would be rejected by the federation for being too large."
1284            );
1285        }
1286
1287        let txid = transaction.tx_hash();
1288
1289        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
1290
1291        let change_outpoints = change_range
1292            .into_iter()
1293            .map(|out_idx| OutPoint { txid, out_idx })
1294            .collect();
1295
1296        let tx_submission_sm = DynState::from_typed(
1297            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1298            TxSubmissionStatesSM {
1299                operation_id,
1300                state: TxSubmissionStates::Created(transaction),
1301            },
1302        );
1303        states.push(tx_submission_sm);
1304
1305        self.executor.add_state_machines_dbtx(dbtx, states).await?;
1306
1307        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
1308            .await;
1309
1310        Ok((txid, change_outpoints))
1311    }
1312
1313    async fn transaction_update_stream(
1314        &self,
1315        operation_id: OperationId,
1316    ) -> BoxStream<'static, TxSubmissionStatesSM> {
1317        self.executor
1318            .notifier()
1319            .module_notifier::<TxSubmissionStatesSM>(TRANSACTION_SUBMISSION_MODULE_INSTANCE)
1320            .subscribe(operation_id)
1321            .await
1322    }
1323
1324    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
1325        let mut dbtx = self.db().begin_transaction_nc().await;
1326
1327        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
1328    }
1329
1330    pub async fn operation_exists_dbtx(
1331        dbtx: &mut DatabaseTransaction<'_>,
1332        operation_id: OperationId,
1333    ) -> bool {
1334        let active_state_exists = dbtx
1335            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1336            .await
1337            .next()
1338            .await
1339            .is_some();
1340
1341        let inactive_state_exists = dbtx
1342            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
1343            .await
1344            .next()
1345            .await
1346            .is_some();
1347
1348        active_state_exists || inactive_state_exists
1349    }
1350
1351    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
1352        self.db
1353            .begin_transaction_nc()
1354            .await
1355            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1356            .await
1357            .next()
1358            .await
1359            .is_some()
1360    }
1361
1362    /// Waits for an output from the primary module to reach its final
1363    /// state.
1364    pub async fn await_primary_module_output(
1365        &self,
1366        operation_id: OperationId,
1367        out_point: OutPoint,
1368    ) -> anyhow::Result<()> {
1369        self.primary_module()
1370            .await_primary_module_output(operation_id, out_point)
1371            .await
1372    }
1373
1374    /// Returns a reference to a typed module client instance by kind
1375    pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
1376        let module_kind = M::kind();
1377        let id = self
1378            .get_first_instance(&module_kind)
1379            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
1380        let module: &M = self
1381            .try_get_module(id)
1382            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
1383            .as_any()
1384            .downcast_ref::<M>()
1385            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
1386        let (db, _) = self.db().with_prefix_module_id(id);
1387        Ok(ClientModuleInstance {
1388            id,
1389            db,
1390            api: self.api().with_module(id),
1391            module,
1392        })
1393    }
1394
1395    pub fn get_module_client_dyn(
1396        &self,
1397        instance_id: ModuleInstanceId,
1398    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
1399        self.try_get_module(instance_id)
1400            .ok_or(anyhow!("Unknown module instance {}", instance_id))
1401    }
1402
1403    pub fn db(&self) -> &Database {
1404        &self.db
1405    }
1406
1407    /// Returns a stream of transaction updates for the given operation id that
1408    /// can later be used to watch for a specific transaction being accepted.
1409    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1410        TransactionUpdates {
1411            update_stream: self.transaction_update_stream(operation_id).await,
1412        }
1413    }
1414
1415    /// Returns the instance id of the first module of the given kind. The
1416    /// primary module will always be returned before any other modules (which
1417    /// themselves are ordered by their instance ID).
1418    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1419        if self
1420            .modules
1421            .get_with_kind(self.primary_module_instance)
1422            .is_some_and(|(kind, _)| kind == module_kind)
1423        {
1424            return Some(self.primary_module_instance);
1425        }
1426
1427        self.modules
1428            .iter_modules()
1429            .find(|(_, kind, _module)| *kind == module_kind)
1430            .map(|(instance_id, _, _)| instance_id)
1431    }
1432
1433    /// Returns the data from which the client's root secret is derived (e.g.
1434    /// BIP39 seed phrase struct).
1435    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1436        get_decoded_client_secret::<T>(self.db()).await
1437    }
1438
1439    /// Waits for outputs from the primary module to reach its final
1440    /// state.
1441    pub async fn await_primary_module_outputs(
1442        &self,
1443        operation_id: OperationId,
1444        outputs: Vec<OutPoint>,
1445    ) -> anyhow::Result<()> {
1446        for out_point in outputs {
1447            self.await_primary_module_output(operation_id, out_point)
1448                .await?;
1449        }
1450
1451        Ok(())
1452    }
1453
1454    /// Returns the config of the client in JSON format.
1455    ///
1456    /// Compared to the consensus module format where module configs are binary
1457    /// encoded this format cannot be cryptographically verified but is easier
1458    /// to consume and to some degree human-readable.
1459    pub async fn get_config_json(&self) -> JsonClientConfig {
1460        self.config().await.to_json()
1461    }
1462
1463    /// Get the primary module
1464    pub fn primary_module(&self) -> &DynClientModule {
1465        self.modules
1466            .get(self.primary_module_instance)
1467            .expect("primary module must be present")
1468    }
1469
1470    /// Balance available to the client for spending
1471    pub async fn get_balance(&self) -> Amount {
1472        self.primary_module()
1473            .get_balance(
1474                self.primary_module_instance,
1475                &mut self.db().begin_transaction_nc().await,
1476            )
1477            .await
1478    }
1479
1480    /// Returns a stream that yields the current client balance every time it
1481    /// changes.
1482    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
1483        let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
1484        let initial_balance = self.get_balance().await;
1485        let db = self.db().clone();
1486        let primary_module = self.primary_module().clone();
1487        let primary_module_instance = self.primary_module_instance;
1488
1489        Box::pin(stream! {
1490            yield initial_balance;
1491            let mut prev_balance = initial_balance;
1492            while let Some(()) = balance_changes.next().await {
1493                let mut dbtx = db.begin_transaction_nc().await;
1494                let balance = primary_module
1495                    .get_balance(primary_module_instance, &mut dbtx)
1496                    .await;
1497
1498                // Deduplicate in case modules cannot always tell if the balance actually changed
1499                if balance != prev_balance {
1500                    prev_balance = balance;
1501                    yield balance;
1502                }
1503            }
1504        })
1505    }
1506
1507    /// Query the federation for API version support and then calculate
1508    /// the best API version to use (supported by most guardians).
1509    pub async fn refresh_peers_api_versions(
1510        num_peers: NumPeers,
1511        api: DynGlobalApi,
1512        db: Database,
1513        num_responses_sender: watch::Sender<usize>,
1514    ) {
1515        // Make a single request to a peer after a delay
1516        //
1517        // The delay is here to unify the type of a future both for initial request and
1518        // possible retries.
1519        async fn make_request(
1520            delay: Duration,
1521            peer_id: PeerId,
1522            api: &DynGlobalApi,
1523        ) -> (
1524            PeerId,
1525            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
1526        ) {
1527            runtime::sleep(delay).await;
1528            (
1529                peer_id,
1530                api.request_single_peer_typed::<SupportedApiVersionsSummary>(
1531                    None,
1532                    VERSION_ENDPOINT.to_owned(),
1533                    ApiRequestErased::default(),
1534                    peer_id,
1535                )
1536                .await,
1537            )
1538        }
1539
1540        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1541        // and make a single async db write operation, it should be OK.
1542        let mut requests = FuturesUnordered::new();
1543
1544        for peer_id in num_peers.peer_ids() {
1545            requests.push(make_request(Duration::ZERO, peer_id, &api));
1546        }
1547
1548        let mut num_responses = 0;
1549
1550        while let Some((peer_id, response)) = requests.next().await {
1551            match response {
1552                Err(err) => {
1553                    if db
1554                        .begin_transaction_nc()
1555                        .await
1556                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1557                        .await
1558                        .is_some()
1559                    {
1560                        debug!(target: LOG_CLIENT, %peer_id, %err, "Failed to refresh API versions of a peer, but we have a previous response");
1561                    } else {
1562                        debug!(target: LOG_CLIENT, %peer_id, %err, "Failed to refresh API versions of a peer, will retry");
1563                        requests.push(make_request(Duration::from_secs(15), peer_id, &api));
1564                    }
1565                }
1566                Ok(o) => {
1567                    // Save the response to the database right away, just to
1568                    // not lose it
1569                    let mut dbtx = db.begin_transaction().await;
1570                    dbtx.insert_entry(
1571                        &PeerLastApiVersionsSummaryKey(peer_id),
1572                        &PeerLastApiVersionsSummary(o),
1573                    )
1574                    .await;
1575                    dbtx.commit_tx().await;
1576                    num_responses += 1;
1577                    // ignore errors: we don't care if anyone is still listening
1578                    let _ = num_responses_sender.send(num_responses);
1579                }
1580            }
1581        }
1582    }
1583
1584    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1585    pub fn supported_api_versions_summary_static(
1586        config: &ClientConfig,
1587        client_module_init: &ClientModuleInitRegistry,
1588    ) -> SupportedApiVersionsSummary {
1589        SupportedApiVersionsSummary {
1590            core: SupportedCoreApiVersions {
1591                core_consensus: config.global.consensus_version,
1592                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1593                    .expect("must not have conflicting versions"),
1594            },
1595            modules: config
1596                .modules
1597                .iter()
1598                .filter_map(|(&module_instance_id, module_config)| {
1599                    client_module_init
1600                        .get(module_config.kind())
1601                        .map(|module_init| {
1602                            (
1603                                module_instance_id,
1604                                SupportedModuleApiVersions {
1605                                    core_consensus: config.global.consensus_version,
1606                                    module_consensus: module_config.version,
1607                                    api: module_init.supported_api_versions(),
1608                                },
1609                            )
1610                        })
1611                })
1612                .collect(),
1613        }
1614    }
1615
1616    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1617        Self::load_and_refresh_common_api_version_static(
1618            &self.config().await,
1619            &self.module_inits,
1620            &self.api,
1621            &self.db,
1622            &self.task_group,
1623        )
1624        .await
1625    }
1626
1627    /// Load the common api versions to use from cache and start a background
1628    /// process to refresh them.
1629    ///
1630    /// This is a compromise, so we not have to wait for version discovery to
1631    /// complete every time a [`Client`] is being built.
1632    async fn load_and_refresh_common_api_version_static(
1633        config: &ClientConfig,
1634        module_init: &ClientModuleInitRegistry,
1635        api: &DynGlobalApi,
1636        db: &Database,
1637        task_group: &TaskGroup,
1638    ) -> anyhow::Result<ApiVersionSet> {
1639        if let Some(v) = db
1640            .begin_transaction_nc()
1641            .await
1642            .get_value(&CachedApiVersionSetKey)
1643            .await
1644        {
1645            debug!("Found existing cached common api versions");
1646            let config = config.clone();
1647            let client_module_init = module_init.clone();
1648            let api = api.clone();
1649            let db = db.clone();
1650            let task_group = task_group.clone();
1651            // Separate task group, because we actually don't want to be waiting for this to
1652            // finish, and it's just best effort.
1653            task_group
1654                .clone()
1655                .spawn_cancellable("refresh_common_api_version_static", async move {
1656                    if let Err(error) = Self::refresh_common_api_version_static(
1657                        &config,
1658                        &client_module_init,
1659                        &api,
1660                        &db,
1661                        task_group,
1662                    )
1663                    .await
1664                    {
1665                        warn!(%error, "Failed to discover common api versions");
1666                    }
1667                });
1668
1669            return Ok(v.0);
1670        }
1671
1672        debug!("No existing cached common api versions found, waiting for initial discovery");
1673        Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
1674            .await
1675    }
1676
1677    async fn refresh_common_api_version_static(
1678        config: &ClientConfig,
1679        client_module_init: &ClientModuleInitRegistry,
1680        api: &DynGlobalApi,
1681        db: &Database,
1682        task_group: TaskGroup,
1683    ) -> anyhow::Result<ApiVersionSet> {
1684        debug!("Refreshing common api versions");
1685
1686        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1687        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1688
1689        task_group.spawn_cancellable("refresh peers api versions", {
1690            Client::refresh_peers_api_versions(
1691                num_peers,
1692                api.clone(),
1693                db.clone(),
1694                num_responses_sender,
1695            )
1696        });
1697
1698        // Wait at most 15 seconds before calculating a set of common api versions to
1699        // use. Note that all peers individual responses from previous attempts
1700        // are still being used, and requests, or even retries for response of
1701        // peers are not actually cancelled, as they are happening on a separate
1702        // task. This is all just to bound the time user can be waiting
1703        // for the join operation to finish, at the risk of picking wrong version in
1704        // very rare circumstances.
1705        let _: Result<_, Elapsed> = runtime::timeout(
1706            Duration::from_secs(15),
1707            num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1708        )
1709        .await;
1710
1711        let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1712
1713        let common_api_versions = discover_common_api_versions_set(
1714            &Self::supported_api_versions_summary_static(config, client_module_init),
1715            &peer_api_version_sets,
1716        )?;
1717
1718        debug!(
1719            value = ?common_api_versions,
1720            "Updating the cached common api versions"
1721        );
1722        let mut dbtx = db.begin_transaction().await;
1723        let _ = dbtx
1724            .insert_entry(
1725                &CachedApiVersionSetKey,
1726                &CachedApiVersionSet(common_api_versions.clone()),
1727            )
1728            .await;
1729
1730        dbtx.commit_tx().await;
1731
1732        Ok(common_api_versions)
1733    }
1734
1735    /// Get the client [`Metadata`]
1736    pub async fn get_metadata(&self) -> Metadata {
1737        self.db
1738            .begin_transaction_nc()
1739            .await
1740            .get_value(&ClientMetadataKey)
1741            .await
1742            .unwrap_or_else(|| {
1743                warn!("Missing existing metadata. This key should have been set on Client init");
1744                Metadata::empty()
1745            })
1746    }
1747
1748    /// Set the client [`Metadata`]
1749    pub async fn set_metadata(&self, metadata: &Metadata) {
1750        self.db
1751            .autocommit::<_, _, anyhow::Error>(
1752                |dbtx, _| {
1753                    Box::pin(async {
1754                        Self::set_metadata_dbtx(dbtx, metadata).await;
1755                        Ok(())
1756                    })
1757                },
1758                None,
1759            )
1760            .await
1761            .expect("Failed to autocommit metadata");
1762    }
1763
1764    pub fn has_pending_recoveries(&self) -> bool {
1765        !self
1766            .client_recovery_progress_receiver
1767            .borrow()
1768            .iter()
1769            .all(|(_id, progress)| progress.is_done())
1770    }
1771
1772    /// Wait for all module recoveries to finish
1773    ///
1774    /// This will block until the recovery task is done with recoveries.
1775    /// Returns success if all recovery tasks are complete (success case),
1776    /// or an error if some modules could not complete the recovery at the time.
1777    ///
1778    /// A bit of a heavy approach.
1779    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1780        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1781        recovery_receiver
1782            .wait_for(|in_progress| {
1783                in_progress
1784                    .iter()
1785                    .all(|(_id, progress)| progress.is_done())
1786            })
1787            .await
1788            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1789
1790        Ok(())
1791    }
1792
1793    /// Subscribe to recover progress for all the modules.
1794    ///
1795    /// This stream can contain duplicate progress for a module.
1796    /// Don't use this stream for detecting completion of recovery.
1797    pub fn subscribe_to_recovery_progress(
1798        &self,
1799    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> {
1800        WatchStream::new(self.client_recovery_progress_receiver.clone())
1801            .flat_map(futures::stream::iter)
1802    }
1803
1804    pub async fn wait_for_module_kind_recovery(
1805        &self,
1806        module_kind: ModuleKind,
1807    ) -> anyhow::Result<()> {
1808        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1809        let config = self.config().await;
1810        recovery_receiver
1811            .wait_for(|in_progress| {
1812                !in_progress
1813                    .iter()
1814                    .filter(|(module_instance_id, _progress)| {
1815                        config.modules[module_instance_id].kind == module_kind
1816                    })
1817                    .any(|(_id, progress)| !progress.is_done())
1818            })
1819            .await
1820            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1821
1822        Ok(())
1823    }
1824
1825    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1826        loop {
1827            if self.executor.get_active_states().await.is_empty() {
1828                break;
1829            }
1830            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1831        }
1832        Ok(())
1833    }
1834
1835    /// Set the client [`Metadata`]
1836    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1837        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1838    }
1839
1840    fn spawn_module_recoveries_task(
1841        &self,
1842        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1843        module_recoveries: BTreeMap<
1844            ModuleInstanceId,
1845            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1846        >,
1847        module_recovery_progress_receivers: BTreeMap<
1848            ModuleInstanceId,
1849            watch::Receiver<RecoveryProgress>,
1850        >,
1851    ) {
1852        let db = self.db.clone();
1853        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1854        self.task_group
1855            .spawn("module recoveries", |_task_handle| async {
1856                Self::run_module_recoveries_task(
1857                    db,
1858                    log_ordering_wakeup_tx,
1859                    recovery_sender,
1860                    module_recoveries,
1861                    module_recovery_progress_receivers,
1862                )
1863                .await;
1864            });
1865    }
1866
1867    async fn run_module_recoveries_task(
1868        db: Database,
1869        log_ordering_wakeup_tx: watch::Sender<()>,
1870        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1871        module_recoveries: BTreeMap<
1872            ModuleInstanceId,
1873            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1874        >,
1875        module_recovery_progress_receivers: BTreeMap<
1876            ModuleInstanceId,
1877            watch::Receiver<RecoveryProgress>,
1878        >,
1879    ) {
1880        debug!(target:LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1881        let mut completed_stream = Vec::new();
1882        let progress_stream = futures::stream::FuturesUnordered::new();
1883
1884        for (module_instance_id, f) in module_recoveries {
1885            completed_stream.push(futures::stream::once(Box::pin(async move {
1886                match f.await {
1887                    Ok(()) => (module_instance_id, None),
1888                    Err(err) => {
1889                        warn!(%err, module_instance_id, "Module recovery failed");
1890                        // a module recovery that failed reports and error and
1891                        // just never finishes, so we don't need a separate state
1892                        // for it
1893                        futures::future::pending::<()>().await;
1894                        unreachable!()
1895                    }
1896                }
1897            })));
1898        }
1899
1900        for (module_instance_id, rx) in module_recovery_progress_receivers {
1901            progress_stream.push(
1902                tokio_stream::wrappers::WatchStream::new(rx)
1903                    .fuse()
1904                    .map(move |progress| (module_instance_id, Some(progress))),
1905            );
1906        }
1907
1908        let mut futures = futures::stream::select(
1909            futures::stream::select_all(progress_stream),
1910            futures::stream::select_all(completed_stream),
1911        );
1912
1913        while let Some((module_instance_id, progress)) = futures.next().await {
1914            let mut dbtx = db.begin_transaction().await;
1915
1916            let prev_progress = *recovery_sender
1917                .borrow()
1918                .get(&module_instance_id)
1919                .expect("existing progress must be present");
1920
1921            let progress = if prev_progress.is_done() {
1922                // since updates might be out of order, once done, stick with it
1923                prev_progress
1924            } else if let Some(progress) = progress {
1925                progress
1926            } else {
1927                prev_progress.to_complete()
1928            };
1929
1930            if !prev_progress.is_done() && progress.is_done() {
1931                info!(
1932                    module_instance_id,
1933                    prev_progress = format!("{}/{}", prev_progress.complete, prev_progress.total),
1934                    progress = format!("{}/{}", progress.complete, progress.total),
1935                    "Recovery complete"
1936                );
1937                dbtx.log_event(
1938                    log_ordering_wakeup_tx.clone(),
1939                    None,
1940                    ModuleRecoveryCompleted {
1941                        module_id: module_instance_id,
1942                    },
1943                )
1944                .await;
1945            } else {
1946                info!(
1947                    module_instance_id,
1948                    prev_progress = format!("{}/{}", prev_progress.complete, prev_progress.total),
1949                    progress = format!("{}/{}", progress.complete, progress.total),
1950                    "Recovery progress"
1951                );
1952            }
1953
1954            dbtx.insert_entry(
1955                &ClientModuleRecovery { module_instance_id },
1956                &ClientModuleRecoveryState { progress },
1957            )
1958            .await;
1959            dbtx.commit_tx().await;
1960
1961            recovery_sender.send_modify(|v| {
1962                v.insert(module_instance_id, progress);
1963            });
1964        }
1965        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1966    }
1967
1968    async fn load_peers_last_api_versions(
1969        db: &Database,
1970        num_peers: NumPeers,
1971    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1972        let mut peer_api_version_sets = BTreeMap::new();
1973
1974        let mut dbtx = db.begin_transaction_nc().await;
1975        for peer_id in num_peers.peer_ids() {
1976            if let Some(v) = dbtx
1977                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1978                .await
1979            {
1980                peer_api_version_sets.insert(peer_id, v.0);
1981            }
1982        }
1983        drop(dbtx);
1984        peer_api_version_sets
1985    }
1986
1987    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1988    /// only the announcements and doesn't use the config as fallback.
1989    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1990        self.db()
1991            .begin_transaction_nc()
1992            .await
1993            .find_by_prefix(&ApiAnnouncementPrefix)
1994            .await
1995            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1996            .collect()
1997            .await
1998    }
1999
2000    /// Returns a list of guardian API URLs
2001    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
2002        get_api_urls(&self.db, &self.config().await).await
2003    }
2004
2005    /// Create an invite code with the api endpoint of the given peer which can
2006    /// be used to download this client config
2007    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2008        self.get_peer_urls()
2009            .await
2010            .into_iter()
2011            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
2012            .map(|peer_url| {
2013                InviteCode::new(
2014                    peer_url.clone(),
2015                    peer,
2016                    self.federation_id(),
2017                    self.api_secret.clone(),
2018                )
2019            })
2020    }
2021
2022    /// Blocks till the client has synced the guardian public key set
2023    /// (introduced in version 0.4) and returns it. Once it has been fetched
2024    /// once this function is guaranteed to return immediately.
2025    pub async fn get_guardian_public_keys_blocking(
2026        &self,
2027    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
2028        self.db.autocommit(|dbtx, _| Box::pin(async move {
2029            let config = self.config().await;
2030
2031            let guardian_pub_keys = if let Some(guardian_pub_keys) = config.global.broadcast_public_keys {guardian_pub_keys}else{
2032                let fetched_config = retry(
2033                    "Fetching guardian public keys",
2034                    backoff_util::background_backoff(),
2035                    || async {
2036                        Ok(self.api.request_current_consensus::<ClientConfig>(
2037                            CLIENT_CONFIG_ENDPOINT.to_owned(),
2038                            ApiRequestErased::default(),
2039                        ).await?)
2040                    },
2041                )
2042                .await
2043                .expect("Will never return on error");
2044
2045                let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
2046                    warn!("Guardian public keys not found in fetched config, server not updated to 0.4 yet");
2047                    pending::<()>().await;
2048                    unreachable!("Pending will never return");
2049                };
2050
2051                let new_config = ClientConfig {
2052                    global: GlobalClientConfig {
2053                        broadcast_public_keys: Some(guardian_pub_keys.clone()),
2054                        ..config.global
2055                    },
2056                    modules: config.modules,
2057                };
2058
2059                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
2060                *(self.config.write().await) = new_config;
2061                guardian_pub_keys
2062            };
2063
2064            Result::<_, ()>::Ok(guardian_pub_keys)
2065        }), None).await.expect("Will retry forever")
2066    }
2067
2068    pub fn handle_global_rpc(
2069        &self,
2070        method: String,
2071        params: serde_json::Value,
2072    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
2073        Box::pin(try_stream! {
2074            match method.as_str() {
2075                "get_balance" => {
2076                    let balance = self.get_balance().await;
2077                    yield serde_json::to_value(balance)?;
2078                }
2079                "subscribe_balance_changes" => {
2080                    let mut stream = self.subscribe_balance_changes().await;
2081                    while let Some(balance) = stream.next().await {
2082                        yield serde_json::to_value(balance)?;
2083                    }
2084                }
2085                "get_config" => {
2086                    let config = self.config().await;
2087                    yield serde_json::to_value(config)?;
2088                }
2089                "get_federation_id" => {
2090                    let federation_id = self.federation_id();
2091                    yield serde_json::to_value(federation_id)?;
2092                }
2093                "get_invite_code" => {
2094                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
2095                    let invite_code = self.invite_code(req.peer).await;
2096                    yield serde_json::to_value(invite_code)?;
2097                }
2098                "list_operations" => {
2099                    // TODO: support pagination
2100                    let operations = self.operation_log().list_operations(usize::MAX, None).await;
2101                    yield serde_json::to_value(operations)?;
2102                }
2103                "has_pending_recoveries" => {
2104                    let has_pending = self.has_pending_recoveries();
2105                    yield serde_json::to_value(has_pending)?;
2106                }
2107                "wait_for_all_recoveries" => {
2108                    self.wait_for_all_recoveries().await?;
2109                    yield serde_json::Value::Null;
2110                }
2111                "subscribe_to_recovery_progress" => {
2112                    let mut stream = self.subscribe_to_recovery_progress();
2113                    while let Some((module_id, progress)) = stream.next().await {
2114                        yield serde_json::json!({
2115                            "module_id": module_id,
2116                            "progress": progress
2117                        });
2118                    }
2119                }
2120                _ => {
2121                    Err(anyhow::format_err!("Unknown method: {}", method))?;
2122                    unreachable!()
2123                },
2124            }
2125        })
2126    }
2127
2128    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
2129    where
2130        E: Event + Send,
2131    {
2132        let mut dbtx = self.db.begin_transaction().await;
2133        self.log_event_dbtx(&mut dbtx, module_id, event).await;
2134        dbtx.commit_tx().await;
2135    }
2136
2137    pub async fn log_event_dbtx<E, Cap>(
2138        &self,
2139        dbtx: &mut DatabaseTransaction<'_, Cap>,
2140        module_id: Option<ModuleInstanceId>,
2141        event: E,
2142    ) where
2143        E: Event + Send,
2144        Cap: Send,
2145    {
2146        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
2147            .await;
2148    }
2149
2150    pub async fn log_event_raw_dbtx<Cap>(
2151        &self,
2152        dbtx: &mut DatabaseTransaction<'_, Cap>,
2153        kind: EventKind,
2154        module: Option<(ModuleKind, ModuleInstanceId)>,
2155        payload: Vec<u8>,
2156        transient: bool,
2157    ) where
2158        Cap: Send,
2159    {
2160        let module_id = module.as_ref().map(|m| m.1);
2161        let module_kind = module.map(|m| m.0);
2162        dbtx.log_event_raw(
2163            self.log_ordering_wakeup_tx.clone(),
2164            kind,
2165            module_kind,
2166            module_id,
2167            payload,
2168            transient,
2169        )
2170        .await;
2171    }
2172
2173    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
2174    where
2175        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
2176        K: DatabaseRecord<Value = EventLogId>,
2177        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2178        R: Future<Output = anyhow::Result<()>>,
2179    {
2180        event_log::handle_events(
2181            self.db.clone(),
2182            pos_key,
2183            self.log_event_added_rx.clone(),
2184            call_fn,
2185        )
2186        .await
2187    }
2188
2189    pub async fn get_event_log(
2190        &self,
2191        pos: Option<EventLogId>,
2192        limit: u64,
2193    ) -> Vec<(
2194        EventLogId,
2195        EventKind,
2196        Option<(ModuleKind, ModuleInstanceId)>,
2197        u64,
2198        serde_json::Value,
2199    )> {
2200        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2201            .await
2202    }
2203
2204    pub async fn get_event_log_dbtx<Cap>(
2205        &self,
2206        dbtx: &mut DatabaseTransaction<'_, Cap>,
2207        pos: Option<EventLogId>,
2208        limit: u64,
2209    ) -> Vec<(
2210        EventLogId,
2211        EventKind,
2212        Option<(ModuleKind, ModuleInstanceId)>,
2213        u64,
2214        serde_json::Value,
2215    )>
2216    where
2217        Cap: Send,
2218    {
2219        dbtx.get_event_log(pos, limit).await
2220    }
2221
2222    /// Register to receiver all new transient (unpersisted) events
2223    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2224        self.log_event_added_transient_tx.subscribe()
2225    }
2226}
2227
2228#[derive(Deserialize)]
2229struct GetInviteCodeRequest {
2230    peer: PeerId,
2231}
2232
2233/// See [`Client::transaction_updates`]
2234pub struct TransactionUpdates {
2235    update_stream: BoxStream<'static, TxSubmissionStatesSM>,
2236}
2237
2238impl TransactionUpdates {
2239    /// Waits for the transaction to be accepted or rejected as part of the
2240    /// operation to which the `TransactionUpdates` object is subscribed.
2241    pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
2242        debug!(target: LOG_CLIENT, %await_txid, "Await tx accepted");
2243        self.update_stream
2244            .filter_map(|tx_update| {
2245                std::future::ready(match tx_update.state {
2246                    TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
2247                    TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
2248                        Some(Err(submit_error))
2249                    }
2250                    _ => None,
2251                })
2252            })
2253            .next_or_pending()
2254            .await?;
2255        debug!(target: LOG_CLIENT, %await_txid, "Tx accepted");
2256        Ok(())
2257    }
2258}
2259
2260/// Admin (guardian) identification and authentication
2261pub struct AdminCreds {
2262    /// Guardian's own `peer_id`
2263    pub peer_id: PeerId,
2264    /// Authentication details
2265    pub auth: ApiAuth,
2266}
2267
2268/// Used to configure, assemble and build [`Client`]
2269pub struct ClientBuilder {
2270    module_inits: ClientModuleInitRegistry,
2271    primary_module_instance: Option<ModuleInstanceId>,
2272    primary_module_kind: Option<ModuleKind>,
2273    admin_creds: Option<AdminCreds>,
2274    db_no_decoders: Database,
2275    meta_service: Arc<MetaService>,
2276    connector: Connector,
2277    stopped: bool,
2278    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2279}
2280
2281impl ClientBuilder {
2282    fn new(db: Database) -> Self {
2283        let meta_service = MetaService::new(LegacyMetaSource::default());
2284        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
2285            broadcast::channel(1024);
2286        ClientBuilder {
2287            module_inits: ModuleInitRegistry::new(),
2288            primary_module_instance: None,
2289            primary_module_kind: None,
2290            connector: Connector::default(),
2291            admin_creds: None,
2292            db_no_decoders: db,
2293            stopped: false,
2294            meta_service,
2295            log_event_added_transient_tx,
2296        }
2297    }
2298
2299    fn from_existing(client: &Client) -> Self {
2300        ClientBuilder {
2301            module_inits: client.module_inits.clone(),
2302            primary_module_instance: Some(client.primary_module_instance),
2303            primary_module_kind: None,
2304            admin_creds: None,
2305            db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
2306            stopped: false,
2307            // non unique
2308            meta_service: client.meta_service.clone(),
2309            connector: client.connector,
2310            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
2311        }
2312    }
2313
2314    /// Replace module generator registry entirely
2315    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
2316        self.module_inits = module_inits;
2317    }
2318
2319    /// Make module generator available when reading the config
2320    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
2321        self.module_inits.attach(module_init);
2322    }
2323
2324    pub fn stopped(&mut self) {
2325        self.stopped = true;
2326    }
2327
2328    /// Uses this module with the given instance id as the primary module. See
2329    /// [`ClientModule::supports_being_primary`] for more information.
2330    ///
2331    /// ## Panics
2332    /// If there was a primary module specified previously
2333    #[deprecated(
2334        since = "0.6.0",
2335        note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
2336    )]
2337    pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
2338        self.with_primary_module_instance_id(primary_module_instance);
2339    }
2340
2341    /// **You are likely looking for
2342    /// [`ClientBuilder::with_primary_module_kind`]. This function is rarely
2343    /// useful and often dangerous, handle with care.**
2344    ///
2345    /// Uses this module with the given instance id as the primary module. See
2346    /// [`ClientModule::supports_being_primary`] for more information. Since the
2347    /// module instance id of modules of a specific kind may differ between
2348    /// different federations it is generally not recommended to specify it, but
2349    /// rather to specify the module kind that should be used as primary. See
2350    /// [`ClientBuilder::with_primary_module_kind`].
2351    ///
2352    /// ## Panics
2353    /// If there was a primary module specified previously
2354    pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
2355        let was_replaced = self
2356            .primary_module_instance
2357            .replace(primary_module_instance)
2358            .is_some();
2359        assert!(
2360            !was_replaced,
2361            "Only one primary module can be given to the builder."
2362        );
2363    }
2364
2365    /// Uses this module kind as the primary module if present in the config.
2366    /// See [`ClientModule::supports_being_primary`] for more information.
2367    ///
2368    /// ## Panics
2369    /// If there was a primary module kind specified previously
2370    pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
2371        let was_replaced = self
2372            .primary_module_kind
2373            .replace(primary_module_kind)
2374            .is_some();
2375        assert!(
2376            !was_replaced,
2377            "Only one primary module kind can be given to the builder."
2378        );
2379    }
2380
2381    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
2382        self.meta_service = meta_service;
2383    }
2384
2385    async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> {
2386        // Only apply the client database migrations if the database has been
2387        // initialized.
2388        // This only works as long as you don't change the client config
2389        if let Ok(client_config) = self.load_existing_config().await {
2390            for (module_id, module_cfg) in client_config.modules {
2391                let kind = module_cfg.kind.clone();
2392                let Some(init) = self.module_inits.get(&kind) else {
2393                    // normal, expected and already logged about when building the client
2394                    continue;
2395                };
2396
2397                apply_migrations_client(
2398                    db,
2399                    kind.to_string(),
2400                    init.get_database_migrations(),
2401                    module_id,
2402                )
2403                .await?;
2404            }
2405        }
2406
2407        Ok(())
2408    }
2409
2410    pub fn db_no_decoders(&self) -> &Database {
2411        &self.db_no_decoders
2412    }
2413
2414    pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
2415        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2416            bail!("Client database not initialized")
2417        };
2418
2419        Ok(config)
2420    }
2421
2422    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
2423        self.admin_creds = Some(creds);
2424    }
2425
2426    pub fn with_connector(&mut self, connector: Connector) {
2427        self.connector = connector;
2428    }
2429
2430    #[cfg(feature = "tor")]
2431    pub fn with_tor_connector(&mut self) {
2432        self.with_connector(Connector::tor());
2433    }
2434
2435    async fn init(
2436        self,
2437        pre_root_secret: DerivableSecret,
2438        config: ClientConfig,
2439        api_secret: Option<String>,
2440        init_mode: InitMode,
2441    ) -> anyhow::Result<ClientHandle> {
2442        if Client::is_initialized(&self.db_no_decoders).await {
2443            bail!("Client database already initialized")
2444        }
2445
2446        // Note: It's important all client initialization is performed as one big
2447        // transaction to avoid half-initialized client state.
2448        {
2449            debug!(target: LOG_CLIENT, "Initializing client database");
2450            let mut dbtx = self.db_no_decoders.begin_transaction().await;
2451            // Save config to DB
2452            dbtx.insert_new_entry(&ClientConfigKey, &config).await;
2453            dbtx.insert_entry(
2454                &ClientPreRootSecretHashKey,
2455                &pre_root_secret.derive_pre_root_secret_hash(),
2456            )
2457            .await;
2458
2459            if let Some(api_secret) = api_secret.as_ref() {
2460                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
2461            }
2462
2463            let init_state = InitState::Pending(init_mode);
2464            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
2465
2466            let metadata = init_state
2467                .does_require_recovery()
2468                .flatten()
2469                .map_or(Metadata::empty(), |s| s.metadata);
2470
2471            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
2472
2473            dbtx.commit_tx_result().await?;
2474        }
2475
2476        let stopped = self.stopped;
2477        self.build(pre_root_secret, config, api_secret, stopped)
2478            .await
2479    }
2480
2481    /// Join a new Federation
2482    ///
2483    /// When a user wants to connect to a new federation this function fetches
2484    /// the federation config and initializes the client database. If a user
2485    /// already joined the federation in the past and has a preexisting database
2486    /// use [`ClientBuilder::open`] instead.
2487    ///
2488    /// **Warning**: Calling `join` with a `root_secret` key that was used
2489    /// previous to `join` a Federation will lead to all sorts of malfunctions
2490    /// including likely loss of funds.
2491    ///
2492    /// This should be generally called only if the `root_secret` key is known
2493    /// not to have been used before (e.g. just randomly generated). For keys
2494    /// that might have been previous used (e.g. provided by the user),
2495    /// it's safer to call [`Self::recover`] which will attempt to recover
2496    /// client module states for the Federation.
2497    ///
2498    /// A typical "join federation" flow would look as follows:
2499    /// ```no_run
2500    /// # use std::str::FromStr;
2501    /// # use fedimint_core::invite_code::InviteCode;
2502    /// # use fedimint_core::config::ClientConfig;
2503    /// # use fedimint_derive_secret::DerivableSecret;
2504    /// # use fedimint_client::{Client, ClientBuilder};
2505    /// # use fedimint_core::db::Database;
2506    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
2507    /// #
2508    /// # #[tokio::main]
2509    /// # async fn main() {
2510    /// # let root_secret: DerivableSecret = unimplemented!();
2511    /// // Create a root secret, e.g. via fedimint-bip39, see also:
2512    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
2513    /// // let root_secret = …;
2514    ///
2515    /// // Get invite code from user
2516    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
2517    ///     .expect("Invalid invite code");
2518    /// let config = fedimint_api_client::api::net::Connector::default().download_from_invite_code(&invite_code).await
2519    ///     .expect("Error downloading config");
2520    ///
2521    /// // Tell the user the federation name, bitcoin network
2522    /// // (e.g. from wallet module config), and other details
2523    /// // that are typically contained in the federation's
2524    /// // meta fields.
2525    ///
2526    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
2527    /// //     .expect("Module not found")
2528    /// //     .network;
2529    ///
2530    /// println!(
2531    ///     "The federation name is: {}",
2532    ///     config.meta::<String>(META_FEDERATION_NAME_KEY)
2533    ///         .expect("Could not decode name field")
2534    ///         .expect("Name isn't set")
2535    /// );
2536    ///
2537    /// // Open the client's database, using the federation ID
2538    /// // as the DB name is a common pattern:
2539    ///
2540    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
2541    /// // let db = RocksDb::open(db_path).expect("error opening DB");
2542    /// # let db: Database = unimplemented!();
2543    ///
2544    /// let client = Client::builder(db).await.expect("Error building client")
2545    ///     // Mount the modules the client should support:
2546    ///     // .with_module(LightningClientInit)
2547    ///     // .with_module(MintClientInit)
2548    ///     // .with_module(WalletClientInit::default())
2549    ///     .join(root_secret, config, None)
2550    ///     .await
2551    ///     .expect("Error joining federation");
2552    /// # }
2553    /// ```
2554    pub async fn join(
2555        self,
2556        pre_root_secret: DerivableSecret,
2557        config: ClientConfig,
2558        api_secret: Option<String>,
2559    ) -> anyhow::Result<ClientHandle> {
2560        self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
2561            .await
2562    }
2563
2564    /// Download most recent valid backup found from the Federation
2565    pub async fn download_backup_from_federation(
2566        &self,
2567        root_secret: &DerivableSecret,
2568        config: &ClientConfig,
2569        api_secret: Option<String>,
2570    ) -> anyhow::Result<Option<ClientBackup>> {
2571        let connector = self.connector;
2572        let api = DynGlobalApi::from_endpoints(
2573            // TODO: change join logic to use FederationId v2
2574            config
2575                .global
2576                .api_endpoints
2577                .iter()
2578                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
2579            &api_secret,
2580            &connector,
2581        );
2582        Client::download_backup_from_federation_static(
2583            &api,
2584            &Self::federation_root_secret(root_secret, config),
2585            &self.decoders(config),
2586        )
2587        .await
2588    }
2589
2590    /// Join a (possibly) previous joined Federation
2591    ///
2592    /// Unlike [`Self::join`], `recover` will run client module recovery for
2593    /// each client module attempting to recover any previous module state.
2594    ///
2595    /// Recovery process takes time during which each recovering client module
2596    /// will not be available for use.
2597    ///
2598    /// Calling `recovery` with a `root_secret` that was not actually previous
2599    /// used in a given Federation is safe.
2600    pub async fn recover(
2601        self,
2602        root_secret: DerivableSecret,
2603        config: ClientConfig,
2604        api_secret: Option<String>,
2605        backup: Option<ClientBackup>,
2606    ) -> anyhow::Result<ClientHandle> {
2607        let client = self
2608            .init(
2609                root_secret,
2610                config,
2611                api_secret,
2612                InitMode::Recover {
2613                    snapshot: backup.clone(),
2614                },
2615            )
2616            .await?;
2617
2618        Ok(client)
2619    }
2620
2621    pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
2622        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2623            bail!("Client database not initialized")
2624        };
2625
2626        if let Some(secret_hash) = self
2627            .db_no_decoders()
2628            .begin_transaction_nc()
2629            .await
2630            .get_value(&ClientPreRootSecretHashKey)
2631            .await
2632        {
2633            ensure!(
2634                pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
2635                "Secret hash does not match. Incorrect secret"
2636            );
2637        } else {
2638            debug!(target: LOG_CLIENT, "Backfilling secret hash");
2639            // Note: no need for dbtx autocommit, we are the only writer ATM
2640            let mut dbtx = self.db_no_decoders.begin_transaction().await;
2641            dbtx.insert_entry(
2642                &ClientPreRootSecretHashKey,
2643                &pre_root_secret.derive_pre_root_secret_hash(),
2644            )
2645            .await;
2646            dbtx.commit_tx().await;
2647        }
2648
2649        let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
2650        let stopped = self.stopped;
2651
2652        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2653        let client = self
2654            .build_stopped(
2655                pre_root_secret,
2656                &config,
2657                api_secret,
2658                log_event_added_transient_tx,
2659            )
2660            .await?;
2661        if !stopped {
2662            client.as_inner().start_executor();
2663        }
2664        Ok(client)
2665    }
2666
2667    /// Build a [`Client`] and start the executor
2668    async fn build(
2669        self,
2670        pre_root_secret: DerivableSecret,
2671        config: ClientConfig,
2672        api_secret: Option<String>,
2673        stopped: bool,
2674    ) -> anyhow::Result<ClientHandle> {
2675        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2676        let client = self
2677            .build_stopped(
2678                pre_root_secret,
2679                &config,
2680                api_secret,
2681                log_event_added_transient_tx,
2682            )
2683            .await?;
2684        if !stopped {
2685            client.as_inner().start_executor();
2686        }
2687
2688        Ok(client)
2689    }
2690
2691    // TODO: remove config argument
2692    /// Build a [`Client`] but do not start the executor
2693    async fn build_stopped(
2694        self,
2695        root_secret: DerivableSecret,
2696        config: &ClientConfig,
2697        api_secret: Option<String>,
2698        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2699    ) -> anyhow::Result<ClientHandle> {
2700        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
2701        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
2702
2703        let decoders = self.decoders(config);
2704        let config = Self::config_decoded(config, &decoders)?;
2705        let fed_id = config.calculate_federation_id();
2706        let db = self.db_no_decoders.with_decoders(decoders.clone());
2707        let connector = self.connector;
2708        let peer_urls = get_api_urls(&db, &config).await;
2709        let api = if let Some(admin_creds) = self.admin_creds.as_ref() {
2710            WsFederationApi::new_admin(
2711                admin_creds.peer_id,
2712                peer_urls
2713                    .into_iter()
2714                    .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
2715                    .context("Admin creds should match a peer")?,
2716                &api_secret,
2717                &connector,
2718            )
2719            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2720            .with_cache()
2721            .into()
2722        } else {
2723            WsFederationApi::from_endpoints(peer_urls, &api_secret, &connector)
2724                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2725                .with_cache()
2726                .into()
2727        };
2728        let task_group = TaskGroup::new();
2729
2730        // Migrate the database before interacting with it in case any on-disk data
2731        // structures have changed.
2732        self.migrate_database(&db).await?;
2733
2734        let init_state = Self::load_init_state(&db).await;
2735
2736        let primary_module_instance = self
2737            .primary_module_instance
2738            .or_else(|| {
2739                let primary_module_kind = self.primary_module_kind?;
2740                config
2741                    .modules
2742                    .iter()
2743                    .find_map(|(module_instance_id, module_config)| {
2744                        (module_config.kind() == &primary_module_kind)
2745                            .then_some(*module_instance_id)
2746                    })
2747            })
2748            .ok_or(anyhow!("No primary module set or found"))?;
2749
2750        let notifier = Notifier::new(db.clone());
2751
2752        let common_api_versions = Client::load_and_refresh_common_api_version_static(
2753            &config,
2754            &self.module_inits,
2755            &api,
2756            &db,
2757            &task_group,
2758        )
2759        .await
2760        .inspect_err(|err| {
2761            warn!(target: LOG_CLIENT, %err, "Failed to discover initial API version to use.");
2762        })
2763        .unwrap_or(ApiVersionSet {
2764            core: ApiVersion::new(0, 0),
2765            // This will cause all modules to skip initialization
2766            modules: BTreeMap::new(),
2767        });
2768
2769        debug!(?common_api_versions, "Completed api version negotiation");
2770
2771        let mut module_recoveries: BTreeMap<
2772            ModuleInstanceId,
2773            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
2774        > = BTreeMap::new();
2775        let mut module_recovery_progress_receivers: BTreeMap<
2776            ModuleInstanceId,
2777            watch::Receiver<RecoveryProgress>,
2778        > = BTreeMap::new();
2779
2780        let final_client = FinalClient::default();
2781
2782        let root_secret = Self::federation_root_secret(&root_secret, &config);
2783
2784        let modules = {
2785            let mut modules = ClientModuleRegistry::default();
2786            for (module_instance_id, module_config) in config.modules.clone() {
2787                let kind = module_config.kind().clone();
2788                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
2789                    debug!("Module kind {kind} of instance {module_instance_id} not found in module gens, skipping");
2790                    continue;
2791                };
2792
2793                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
2794                else {
2795                    warn!("Module kind {kind} of instance {module_instance_id} has not compatible api version, skipping");
2796                    continue;
2797                };
2798
2799                // since the exact logic of when to start recovery is a bit gnarly,
2800                // the recovery call is extracted here.
2801                let start_module_recover_fn =
2802                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
2803                        let module_config = module_config.clone();
2804                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
2805                        let db = db.clone();
2806                        let kind = kind.clone();
2807                        let notifier = notifier.clone();
2808                        let api = api.clone();
2809                        let root_secret = root_secret.clone();
2810                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
2811                        let final_client = final_client.clone();
2812                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
2813                        let task_group = task_group.clone();
2814                        let module_init = module_init.clone();
2815                        (
2816                            Box::pin(async move {
2817                                module_init
2818                                    .recover(
2819                                        final_client.clone(),
2820                                        fed_id,
2821                                        num_peers,
2822                                        module_config.clone(),
2823                                        db.clone(),
2824                                        module_instance_id,
2825                                        common_api_versions.core,
2826                                        api_version,
2827                                        root_secret.derive_module_secret(module_instance_id),
2828                                        notifier.clone(),
2829                                        api.clone(),
2830                                        admin_auth,
2831                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
2832                                        progress_tx,
2833                                        task_group,
2834                                    )
2835                                    .await
2836                                    .map_err(|err| {
2837                                        warn!(
2838                                                module_id = module_instance_id, %kind, %err, "Module failed to recover"
2839                                            );
2840                                        err
2841                                    })
2842                            }),
2843                            progress_rx,
2844                        )
2845                    };
2846
2847                let recovery = if let Some(snapshot) = init_state.does_require_recovery() {
2848                    if let Some(module_recovery_state) = db
2849                        .begin_transaction_nc()
2850                        .await
2851                        .get_value(&ClientModuleRecovery { module_instance_id })
2852                        .await
2853                    {
2854                        if module_recovery_state.is_done() {
2855                            debug!(
2856                                id = %module_instance_id,
2857                                %kind, "Module recovery already complete"
2858                            );
2859                            None
2860                        } else {
2861                            debug!(
2862                                id = %module_instance_id,
2863                                %kind,
2864                                progress = %module_recovery_state.progress,
2865                                "Starting module recovery with an existing progress"
2866                            );
2867                            Some(start_module_recover_fn(
2868                                snapshot,
2869                                module_recovery_state.progress,
2870                            ))
2871                        }
2872                    } else {
2873                        let progress = RecoveryProgress::none();
2874                        let mut dbtx = db.begin_transaction().await;
2875                        dbtx.log_event(
2876                            log_ordering_wakeup_tx.clone(),
2877                            None,
2878                            ModuleRecoveryStarted {
2879                                module_id: module_instance_id,
2880                            },
2881                        )
2882                        .await;
2883                        dbtx.insert_entry(
2884                            &ClientModuleRecovery { module_instance_id },
2885                            &ClientModuleRecoveryState { progress },
2886                        )
2887                        .await;
2888
2889                        dbtx.commit_tx().await;
2890
2891                        debug!(
2892                            id = %module_instance_id,
2893                            %kind, "Starting new module recovery"
2894                        );
2895                        Some(start_module_recover_fn(snapshot, progress))
2896                    }
2897                } else {
2898                    None
2899                };
2900
2901                if let Some((recovery, recovery_progress_rx)) = recovery {
2902                    module_recoveries.insert(module_instance_id, recovery);
2903                    module_recovery_progress_receivers
2904                        .insert(module_instance_id, recovery_progress_rx);
2905                } else {
2906                    let module = module_init
2907                        .init(
2908                            final_client.clone(),
2909                            fed_id,
2910                            config.global.api_endpoints.len(),
2911                            module_config,
2912                            db.clone(),
2913                            module_instance_id,
2914                            common_api_versions.core,
2915                            api_version,
2916                            // This is a divergence from the legacy client, where the child secret
2917                            // keys were derived using *module kind*-specific derivation paths.
2918                            // Since the new client has to support multiple, segregated modules of
2919                            // the same kind we have to use the instance id instead.
2920                            root_secret.derive_module_secret(module_instance_id),
2921                            notifier.clone(),
2922                            api.clone(),
2923                            self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
2924                            task_group.clone(),
2925                        )
2926                        .await?;
2927
2928                    if primary_module_instance == module_instance_id
2929                        && !module.supports_being_primary()
2930                    {
2931                        bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module");
2932                    }
2933
2934                    modules.register_module(module_instance_id, kind, module);
2935                }
2936            }
2937            modules
2938        };
2939
2940        if init_state.is_pending() && module_recoveries.is_empty() {
2941            let mut dbtx = db.begin_transaction().await;
2942            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
2943                .await;
2944            dbtx.commit_tx().await;
2945        }
2946
2947        let executor = {
2948            let mut executor_builder = Executor::builder();
2949            executor_builder
2950                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
2951
2952            for (module_instance_id, _, module) in modules.iter_modules() {
2953                executor_builder.with_module_dyn(module.context(module_instance_id));
2954            }
2955
2956            for module_instance_id in module_recoveries.keys() {
2957                executor_builder.with_valid_module_id(*module_instance_id);
2958            }
2959
2960            executor_builder.build(db.clone(), notifier, task_group.clone())
2961        };
2962
2963        let recovery_receiver_init_val = module_recovery_progress_receivers
2964            .iter()
2965            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
2966            .collect::<BTreeMap<_, _>>();
2967        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
2968            watch::channel(recovery_receiver_init_val);
2969
2970        let client_inner = Arc::new(Client {
2971            config: RwLock::new(config.clone()),
2972            api_secret,
2973            decoders,
2974            db: db.clone(),
2975            federation_id: fed_id,
2976            federation_meta: config.global.meta,
2977            primary_module_instance,
2978            modules,
2979            module_inits: self.module_inits.clone(),
2980            log_ordering_wakeup_tx,
2981            log_event_added_rx,
2982            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
2983            executor,
2984            api,
2985            secp_ctx: Secp256k1::new(),
2986            root_secret,
2987            task_group,
2988            operation_log: OperationLog::new(db.clone()),
2989            client_recovery_progress_receiver,
2990            meta_service: self.meta_service,
2991            connector,
2992        });
2993        client_inner
2994            .task_group
2995            .spawn_cancellable("MetaService::update_continuously", {
2996                let client_inner = client_inner.clone();
2997                async move {
2998                    client_inner
2999                        .meta_service
3000                        .update_continuously(&client_inner)
3001                        .await;
3002                }
3003            });
3004
3005        client_inner.task_group.spawn_cancellable(
3006            "update-api-announcements",
3007            run_api_announcement_sync(client_inner.clone()),
3008        );
3009
3010        client_inner.task_group.spawn_cancellable(
3011            "event log ordering task",
3012            run_event_log_ordering_task(
3013                db.clone(),
3014                log_ordering_wakeup_rx,
3015                log_event_added_tx,
3016                log_event_added_transient_tx,
3017            ),
3018        );
3019        let client_arc = ClientHandle::new(client_inner);
3020
3021        for (_, _, module) in client_arc.modules.iter_modules() {
3022            module.start().await;
3023        }
3024
3025        final_client.set(client_arc.downgrade());
3026
3027        if !module_recoveries.is_empty() {
3028            client_arc.spawn_module_recoveries_task(
3029                client_recovery_progress_sender,
3030                module_recoveries,
3031                module_recovery_progress_receivers,
3032            );
3033        }
3034
3035        Ok(client_arc)
3036    }
3037
3038    async fn load_init_state(db: &Database) -> InitState {
3039        let mut dbtx = db.begin_transaction_nc().await;
3040        dbtx.get_value(&ClientInitStateKey)
3041            .await
3042            .unwrap_or_else(|| {
3043                // could be turned in a hard error in the future, but for now
3044                // no need to break backward compat.
3045                warn!("Client missing ClientRequiresRecovery: assuming complete");
3046                db::InitState::Complete(db::InitModeComplete::Fresh)
3047            })
3048    }
3049
3050    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
3051        let mut decoders = client_decoders(
3052            &self.module_inits,
3053            config
3054                .modules
3055                .iter()
3056                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
3057        );
3058
3059        decoders.register_module(
3060            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
3061            ModuleKind::from_static_str("tx_submission"),
3062            tx_submission_sm_decoder(),
3063        );
3064
3065        decoders
3066    }
3067
3068    fn config_decoded(
3069        config: &ClientConfig,
3070        decoders: &ModuleDecoderRegistry,
3071    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
3072        config.clone().redecode_raw(decoders)
3073    }
3074
3075    /// Re-derive client's `root_secret` using the federation ID. This
3076    /// eliminates the possibility of having the same client `root_secret`
3077    /// across multiple federations.
3078    fn federation_root_secret(
3079        root_secret: &DerivableSecret,
3080        config: &ClientConfig,
3081    ) -> DerivableSecret {
3082        root_secret.federation_key(&config.global.calculate_federation_id())
3083    }
3084
3085    /// Register to receiver all new transient (unpersisted) events
3086    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
3087        self.log_event_added_transient_tx.subscribe()
3088    }
3089}
3090
3091/// Fetches the encoded client secret from the database and decodes it.
3092/// If an encoded client secret is not present in the database, or if
3093/// decoding fails, an error is returned.
3094pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
3095    let mut tx = db.begin_transaction_nc().await;
3096    let client_secret = tx.get_value(&EncodedClientSecretKey).await;
3097
3098    match client_secret {
3099        Some(client_secret) => {
3100            T::consensus_decode(&mut client_secret.as_slice(), &ModuleRegistry::default())
3101                .map_err(|e| anyhow!("Decoding failed: {e}"))
3102        }
3103        None => bail!("Encoded client secret not present in DB"),
3104    }
3105}
3106
3107pub fn client_decoders<'a>(
3108    registry: &ModuleInitRegistry<DynClientModuleInit>,
3109    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
3110) -> ModuleDecoderRegistry {
3111    let mut modules = BTreeMap::new();
3112    for (id, kind) in module_kinds {
3113        let Some(init) = registry.get(kind) else {
3114            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
3115            continue;
3116        };
3117
3118        modules.insert(
3119            id,
3120            (
3121                kind.clone(),
3122                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
3123            ),
3124        );
3125    }
3126    ModuleDecoderRegistry::from(modules)
3127}