fedimint_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::explicit_deref_methods)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::needless_lifetimes)]
10#![allow(clippy::return_self_not_must_use)]
11#![allow(clippy::too_many_lines)]
12#![allow(clippy::type_complexity)]
13
14//! # Client library for fedimintd
15//!
16//! This library provides a client interface to build module clients that can be
17//! plugged together into a fedimint client that exposes a high-level interface
18//! for application authors to integrate with.
19//!
20//! ## Module Clients
21//! Module clients have to at least implement the [`module::ClientModule`] trait
22//! and a factory struct implementing [`module::init::ClientModuleInit`]. The
23//! `ClientModule` trait defines the module types (tx inputs, outputs, etc.) as
24//! well as the module's [state machines](sm::State).
25//!
26//! ### State machines
27//! State machines are spawned when starting operations and drive them
28//! forward in the background. All module state machines are run by a central
29//! [`sm::Executor`]. This means typically starting an operation shall return
30//! instantly.
31//!
32//! For example when doing a deposit the function starting it would immediately
33//! return a deposit address and a [`OperationId`] (important concept, highly
34//! recommended to read the docs) while spawning a state machine checking the
35//! blockchain for incoming bitcoin transactions. The progress of these state
36//! machines can then be *observed* using the operation id, but no further user
37//! interaction is required to drive them forward.
38//!
39//! ### State Machine Contexts
40//! State machines have access to both a [global
41//! context](`DynGlobalClientContext`) as well as to a [module-specific
42//! context](module::ClientModule::context).
43//!
44//! The global context provides access to the federation API and allows to claim
45//! module outputs (and transferring the value into the client's wallet), which
46//! can be used for refunds.
47//!
48//! The client-specific context can be used for other purposes, such as
49//! supplying config to the state transitions or giving access to other APIs
50//! (e.g. LN gateway in case of the lightning module).
51//!
52//! ### Extension traits
53//! The modules themselves can only create inputs and outputs that then have to
54//! be combined into transactions by the user and submitted via
55//! [`Client::finalize_and_submit_transaction`]. To make this easier most module
56//! client implementations contain an extension trait which is implemented for
57//! [`Client`] and allows to create the most typical fedimint transactions with
58//! a single function call.
59//!
60//! To observe the progress each high level operation function should be
61//! accompanied by one returning a stream of high-level operation updates.
62//! Internally that stream queries the state machines belonging to the
63//! operation to determine the high-level operation state.
64//!
65//! ### Primary Modules
66//! Not all modules have the ability to hold money for long. E.g. the lightning
67//! module and its smart contracts are only used to incentivize LN payments, not
68//! to hold money. The mint module on the other hand holds e-cash note and can
69//! thus be used to fund transactions and to absorb change. Module clients with
70//! this ability should implement [`ClientModule::supports_being_primary`] and
71//! related methods.
72//!
73//! For a example of a client module see [the mint client](https://github.com/fedimint/fedimint/blob/master/modules/fedimint-mint-client/src/lib.rs).
74//!
75//! ## Client
76//! The [`Client`] struct is the main entry point for application authors. It is
77//! constructed using its builder which can be obtained via [`Client::builder`].
78//! The supported module clients have to be chosen at compile time while the
79//! actually available ones will be determined by the config loaded at runtime.
80//!
81//! For a hacky instantiation of a complete client see the [`ng` subcommand of `fedimint-cli`](https://github.com/fedimint/fedimint/blob/55f9d88e17d914b92a7018de677d16e57ed42bf6/fedimint-cli/src/ng.rs#L56-L73).
82
83use std::collections::{BTreeMap, HashSet};
84use std::fmt::{Debug, Formatter};
85use std::future::pending;
86use std::ops::{self, Range};
87use std::pin::Pin;
88use std::sync::Arc;
89use std::time::{Duration, SystemTime, UNIX_EPOCH};
90
91use anyhow::{anyhow, bail, ensure, format_err, Context};
92use api::ClientRawFederationApiExt as _;
93use async_stream::{stream, try_stream};
94use backup::ClientBackup;
95use bitcoin::secp256k1;
96use db::{
97    apply_migrations_client, apply_migrations_core_client, get_core_client_database_migrations,
98    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientInitStateKey,
99    ClientModuleRecovery, ClientPreRootSecretHashKey, EncodedClientSecretKey, InitMode,
100    PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey,
101};
102use fedimint_api_client::api::net::Connector;
103use fedimint_api_client::api::{
104    ApiVersionSet, DynGlobalApi, DynModuleApi, FederationApiExt, GlobalFederationApiWithCacheExt,
105    IGlobalFederationApi, ReconnectFederationApi,
106};
107use fedimint_core::config::{
108    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
109};
110use fedimint_core::core::{
111    DynInput, DynOutput, IInput, IOutput, IntoDynInstance as _, ModuleInstanceId, ModuleKind,
112    OperationId,
113};
114use fedimint_core::db::{
115    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
116    IDatabaseTransactionOpsCoreTyped, NonCommittable,
117};
118use fedimint_core::encoding::{Decodable, Encodable};
119use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
120use fedimint_core::invite_code::InviteCode;
121use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
122use fedimint_core::module::{
123    ApiAuth, ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
124    SupportedCoreApiVersions, SupportedModuleApiVersions,
125};
126use fedimint_core::net::api_announcement::SignedApiAnnouncement;
127use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
128use fedimint_core::transaction::Transaction;
129use fedimint_core::util::{
130    backoff_util, retry, BoxStream, FmtCompact as _, FmtCompactAnyhow as _, NextOrPending, SafeUrl,
131};
132use fedimint_core::{
133    apply, async_trait_maybe_send, dyn_newtype_define, fedimint_build_code_version_env,
134    maybe_add_send, maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId,
135    TransactionId,
136};
137pub use fedimint_derive_secret as derivable_secret;
138use fedimint_derive_secret::DerivableSecret;
139use fedimint_eventlog::{
140    self, run_event_log_ordering_task, DBTransactionEventLogExt, Event, EventKind, EventLogEntry,
141    EventLogId,
142};
143use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
144use futures::stream::FuturesUnordered;
145use futures::{Future, Stream, StreamExt};
146use meta::{LegacyMetaSource, MetaService};
147use module::recovery::RecoveryProgress;
148use module::{ClientContextIface, DynClientModule, FinalClientIface, IdxRange, OutPointRange};
149use rand::thread_rng;
150use secp256k1::{PublicKey, Secp256k1};
151use secret::{DeriveableSecretClientExt, PlainRootSecretStrategy, RootSecretStrategy as _};
152use serde::{Deserialize, Serialize};
153use thiserror::Error;
154#[cfg(not(target_family = "wasm"))]
155use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor};
156use tokio::sync::{broadcast, watch, RwLock};
157use tokio_stream::wrappers::WatchStream;
158use tracing::{debug, error, info, trace, warn};
159use transaction::{
160    ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputSM, TxSubmissionStatesSM,
161};
162
163use crate::api_announcements::{get_api_urls, run_api_announcement_sync, ApiAnnouncementPrefix};
164use crate::api_version_discovery::discover_common_api_versions_set;
165use crate::backup::Metadata;
166use crate::db::{ClientMetadataKey, ClientModuleRecoveryState, InitState, OperationLogKey};
167use crate::module::init::{
168    ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit,
169};
170use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator};
171use crate::oplog::OperationLog;
172use crate::sm::executor::{
173    ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix,
174};
175use crate::sm::{ClientSMDatabaseTransaction, DynState, Executor, IState, Notifier, State};
176use crate::transaction::{
177    tx_submission_sm_decoder, ClientInput, ClientOutputBundle, TransactionBuilder,
178    TxSubmissionContext, TxSubmissionStates, TRANSACTION_SUBMISSION_MODULE_INSTANCE,
179};
180
181pub mod api;
182
183/// Client backup
184pub mod backup;
185/// Database keys used by the client
186pub mod db;
187/// Environment variables
188pub mod envs;
189/// Module client interface definitions
190pub mod module;
191/// Operation log subsystem of the client
192pub mod oplog;
193/// Secret handling & derivation
194pub mod secret;
195/// Client state machine interfaces and executor implementation
196pub mod sm;
197/// Structs and interfaces to construct Fedimint transactions
198pub mod transaction;
199
200mod api_version_discovery;
201
202pub mod api_announcements;
203/// Management of meta fields
204pub mod meta;
205
206#[derive(Serialize, Deserialize)]
207pub struct TxCreatedEvent {
208    txid: TransactionId,
209    operation_id: OperationId,
210}
211
212impl Event for TxCreatedEvent {
213    const MODULE: Option<ModuleKind> = None;
214
215    const KIND: EventKind = EventKind::from_static("tx-created");
216}
217
218#[derive(Serialize, Deserialize)]
219pub struct TxAcceptedEvent {
220    txid: TransactionId,
221    operation_id: OperationId,
222}
223
224impl Event for TxAcceptedEvent {
225    const MODULE: Option<ModuleKind> = None;
226
227    const KIND: EventKind = EventKind::from_static("tx-accepted");
228}
229
230#[derive(Serialize, Deserialize)]
231pub struct TxRejectedEvent {
232    txid: TransactionId,
233    error: String,
234    operation_id: OperationId,
235}
236impl Event for TxRejectedEvent {
237    const MODULE: Option<ModuleKind> = None;
238
239    const KIND: EventKind = EventKind::from_static("tx-rejected");
240}
241
242#[derive(Serialize, Deserialize)]
243pub struct ModuleRecoveryStarted {
244    module_id: ModuleInstanceId,
245}
246
247impl Event for ModuleRecoveryStarted {
248    const MODULE: Option<ModuleKind> = None;
249
250    const KIND: EventKind = EventKind::from_static("module-recovery-started");
251}
252
253#[derive(Serialize, Deserialize)]
254pub struct ModuleRecoveryCompleted {
255    module_id: ModuleInstanceId,
256}
257
258impl Event for ModuleRecoveryCompleted {
259    const MODULE: Option<ModuleKind> = None;
260
261    const KIND: EventKind = EventKind::from_static("module-recovery-completed");
262}
263
264pub type InstancelessDynClientInput = ClientInput<Box<maybe_add_send_sync!(dyn IInput + 'static)>>;
265
266pub type InstancelessDynClientInputSM =
267    ClientInputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
268
269pub type InstancelessDynClientInputBundle = ClientInputBundle<
270    Box<maybe_add_send_sync!(dyn IInput + 'static)>,
271    Box<maybe_add_send_sync!(dyn IState + 'static)>,
272>;
273
274pub type InstancelessDynClientOutput =
275    ClientOutput<Box<maybe_add_send_sync!(dyn IOutput + 'static)>>;
276
277pub type InstancelessDynClientOutputSM =
278    ClientOutputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
279pub type InstancelessDynClientOutputBundle = ClientOutputBundle<
280    Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
281    Box<maybe_add_send_sync!(dyn IState + 'static)>,
282>;
283
284#[derive(Debug, Error)]
285pub enum AddStateMachinesError {
286    #[error("State already exists in database")]
287    StateAlreadyExists,
288    #[error("Got {0}")]
289    Other(#[from] anyhow::Error),
290}
291
292pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
293
294#[apply(async_trait_maybe_send!)]
295pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
296    /// Returned a reference client's module API client, so that module-specific
297    /// calls can be made
298    fn module_api(&self) -> DynModuleApi;
299
300    async fn client_config(&self) -> ClientConfig;
301
302    /// Returns a reference to the client's federation API client. The provided
303    /// interface [`IGlobalFederationApi`] typically does not provide the
304    /// necessary functionality, for this extension traits like
305    /// [`fedimint_api_client::api::IGlobalFederationApi`] have to be used.
306    // TODO: Could be removed in favor of client() except for testing
307    fn api(&self) -> &DynGlobalApi;
308
309    fn decoders(&self) -> &ModuleDecoderRegistry;
310
311    /// This function is mostly meant for internal use, you are probably looking
312    /// for [`DynGlobalClientContext::claim_inputs`].
313    /// Returns transaction id of the funding transaction and an optional
314    /// `OutPoint` that represents change if change was added.
315    async fn claim_inputs_dyn(
316        &self,
317        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
318        inputs: InstancelessDynClientInputBundle,
319    ) -> anyhow::Result<OutPointRange>;
320
321    /// This function is mostly meant for internal use, you are probably looking
322    /// for [`DynGlobalClientContext::fund_output`].
323    /// Returns transaction id of the funding transaction and an optional
324    /// `OutPoint` that represents change if change was added.
325    async fn fund_output_dyn(
326        &self,
327        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
328        outputs: InstancelessDynClientOutputBundle,
329    ) -> anyhow::Result<OutPointRange>;
330
331    /// Adds a state machine to the executor.
332    async fn add_state_machine_dyn(
333        &self,
334        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
335        sm: Box<maybe_add_send_sync!(dyn IState)>,
336    ) -> AddStateMachinesResult;
337
338    async fn log_event_json(
339        &self,
340        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
341        kind: EventKind,
342        module: Option<(ModuleKind, ModuleInstanceId)>,
343        payload: serde_json::Value,
344        persist: bool,
345    );
346
347    async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM>;
348}
349
350#[apply(async_trait_maybe_send!)]
351impl IGlobalClientContext for () {
352    fn module_api(&self) -> DynModuleApi {
353        unimplemented!("fake implementation, only for tests");
354    }
355
356    async fn client_config(&self) -> ClientConfig {
357        unimplemented!("fake implementation, only for tests");
358    }
359
360    fn api(&self) -> &DynGlobalApi {
361        unimplemented!("fake implementation, only for tests");
362    }
363
364    fn decoders(&self) -> &ModuleDecoderRegistry {
365        unimplemented!("fake implementation, only for tests");
366    }
367
368    async fn claim_inputs_dyn(
369        &self,
370        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
371        _input: InstancelessDynClientInputBundle,
372    ) -> anyhow::Result<OutPointRange> {
373        unimplemented!("fake implementation, only for tests");
374    }
375
376    async fn fund_output_dyn(
377        &self,
378        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
379        _outputs: InstancelessDynClientOutputBundle,
380    ) -> anyhow::Result<OutPointRange> {
381        unimplemented!("fake implementation, only for tests");
382    }
383
384    async fn add_state_machine_dyn(
385        &self,
386        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
387        _sm: Box<maybe_add_send_sync!(dyn IState)>,
388    ) -> AddStateMachinesResult {
389        unimplemented!("fake implementation, only for tests");
390    }
391
392    async fn log_event_json(
393        &self,
394        _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
395        _kind: EventKind,
396        _module: Option<(ModuleKind, ModuleInstanceId)>,
397        _payload: serde_json::Value,
398        _persist: bool,
399    ) {
400        unimplemented!("fake implementation, only for tests");
401    }
402
403    async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
404        unimplemented!("fake implementation, only for tests");
405    }
406}
407
408dyn_newtype_define! {
409    /// Global state and functionality provided to all state machines running in the
410    /// client
411    #[derive(Clone)]
412    pub DynGlobalClientContext(Arc<IGlobalClientContext>)
413}
414
415impl DynGlobalClientContext {
416    pub fn new_fake() -> Self {
417        DynGlobalClientContext::from(())
418    }
419
420    pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> {
421        self.transaction_update_stream()
422            .await
423            .filter_map(|tx_update| {
424                std::future::ready(match tx_update.state {
425                    TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
426                    TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
427                        Some(Err(submit_error))
428                    }
429                    _ => None,
430                })
431            })
432            .next_or_pending()
433            .await
434    }
435
436    pub async fn claim_inputs<I, S>(
437        &self,
438        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
439        inputs: ClientInputBundle<I, S>,
440    ) -> anyhow::Result<OutPointRange>
441    where
442        I: IInput + MaybeSend + MaybeSync + 'static,
443        S: IState + MaybeSend + MaybeSync + 'static,
444    {
445        self.claim_inputs_dyn(dbtx, inputs.into_instanceless())
446            .await
447    }
448
449    /// Creates a transaction with the supplied output and funding added by the
450    /// primary module if possible. If the primary module does not have the
451    /// required funds this function fails.
452    ///
453    /// The transactions submission state machine as well as the state machines
454    /// for the funding inputs are generated automatically. The caller is
455    /// responsible for the output's state machines, should there be any
456    /// required.
457    pub async fn fund_output<O, S>(
458        &self,
459        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
460        outputs: ClientOutputBundle<O, S>,
461    ) -> anyhow::Result<OutPointRange>
462    where
463        O: IOutput + MaybeSend + MaybeSync + 'static,
464        S: IState + MaybeSend + MaybeSync + 'static,
465    {
466        self.fund_output_dyn(dbtx, outputs.into_instanceless())
467            .await
468    }
469
470    /// Allows adding state machines from inside a transition to the executor.
471    /// The added state machine belongs to the same module instance as the state
472    /// machine from inside which it was spawned.
473    pub async fn add_state_machine<S>(
474        &self,
475        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
476        sm: S,
477    ) -> AddStateMachinesResult
478    where
479        S: State + MaybeSend + MaybeSync + 'static,
480    {
481        self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
482    }
483
484    async fn log_event<E>(&self, dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, event: E)
485    where
486        E: Event + Send,
487    {
488        self.log_event_json(
489            dbtx,
490            E::KIND,
491            E::MODULE.map(|m| (m, dbtx.module_id())),
492            serde_json::to_value(&event).expect("Payload serialization can't fail"),
493            <E as Event>::PERSIST,
494        )
495        .await;
496    }
497}
498
499fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>(
500    state_gen: StateGenerator<S>,
501) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> {
502    Arc::new(move |out_point_range| {
503        let states: Vec<S> = state_gen(out_point_range);
504        states
505            .into_iter()
506            .map(|state| box_up_state(state))
507            .collect()
508    })
509}
510
511/// Not sure why I couldn't just directly call `Box::new` ins
512/// [`states_to_instanceless_dyn`], but this fixed it.
513fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> {
514    Box::new(state)
515}
516
517impl<T> From<Arc<T>> for DynGlobalClientContext
518where
519    T: IGlobalClientContext,
520{
521    fn from(inner: Arc<T>) -> Self {
522        DynGlobalClientContext { inner }
523    }
524}
525
526// TODO: impl `Debug` for `Client` and derive here
527impl Debug for Client {
528    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
529        write!(f, "Client")
530    }
531}
532
533/// Global state given to a specific client module and state. It is aware inside
534/// which module instance and operation it is used and to avoid module being
535/// aware of their instance id etc.
536#[derive(Clone, Debug)]
537struct ModuleGlobalClientContext {
538    client: Arc<Client>,
539    module_instance_id: ModuleInstanceId,
540    operation: OperationId,
541}
542
543#[apply(async_trait_maybe_send!)]
544impl IGlobalClientContext for ModuleGlobalClientContext {
545    fn module_api(&self) -> DynModuleApi {
546        self.api().with_module(self.module_instance_id)
547    }
548
549    fn api(&self) -> &DynGlobalApi {
550        &self.client.api
551    }
552
553    fn decoders(&self) -> &ModuleDecoderRegistry {
554        self.client.decoders()
555    }
556
557    async fn client_config(&self) -> ClientConfig {
558        self.client.config().await
559    }
560
561    async fn claim_inputs_dyn(
562        &self,
563        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
564        inputs: InstancelessDynClientInputBundle,
565    ) -> anyhow::Result<OutPointRange> {
566        let tx_builder =
567            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
568
569        self.client
570            .finalize_and_submit_transaction_inner(
571                &mut dbtx.global_tx().to_ref_nc(),
572                self.operation,
573                tx_builder,
574            )
575            .await
576    }
577
578    async fn fund_output_dyn(
579        &self,
580        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
581        outputs: InstancelessDynClientOutputBundle,
582    ) -> anyhow::Result<OutPointRange> {
583        let tx_builder =
584            TransactionBuilder::new().with_outputs(outputs.into_dyn(self.module_instance_id));
585
586        self.client
587            .finalize_and_submit_transaction_inner(
588                &mut dbtx.global_tx().to_ref_nc(),
589                self.operation,
590                tx_builder,
591            )
592            .await
593    }
594
595    async fn add_state_machine_dyn(
596        &self,
597        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
598        sm: Box<maybe_add_send_sync!(dyn IState)>,
599    ) -> AddStateMachinesResult {
600        let state = DynState::from_parts(self.module_instance_id, sm);
601
602        self.client
603            .executor
604            .add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state])
605            .await
606    }
607
608    async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
609        self.client.transaction_update_stream(self.operation).await
610    }
611
612    async fn log_event_json(
613        &self,
614        dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
615        kind: EventKind,
616        module: Option<(ModuleKind, ModuleInstanceId)>,
617        payload: serde_json::Value,
618        persist: bool,
619    ) {
620        self.client
621            .log_event_raw_dbtx(
622                dbtx.global_tx(),
623                kind,
624                module,
625                serde_json::to_vec(&payload).expect("Serialization can't fail"),
626                persist,
627            )
628            .await;
629    }
630}
631
632fn states_add_instance(
633    module_instance_id: ModuleInstanceId,
634    state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>,
635) -> StateGenerator<DynState> {
636    Arc::new(move |out_point_range| {
637        let states = state_gen(out_point_range);
638        Iterator::collect(
639            states
640                .into_iter()
641                .map(|state| DynState::from_parts(module_instance_id, state)),
642        )
643    })
644}
645
646/// User handle to the [`Client`] instance
647///
648/// On the drop of [`ClientHandle`] the client will be shut-down, and resources
649/// it used freed.
650///
651/// Notably it [`ops::Deref`]s to the [`Client`] where most
652/// methods live.
653///
654/// Put this in an Arc to clone it (see [`ClientHandleArc`]).
655#[derive(Debug)]
656pub struct ClientHandle {
657    inner: Option<Arc<Client>>,
658}
659
660/// An alias for a reference counted [`ClientHandle`]
661pub type ClientHandleArc = Arc<ClientHandle>;
662
663impl ClientHandle {
664    /// Create
665    fn new(inner: Arc<Client>) -> Self {
666        ClientHandle {
667            inner: inner.into(),
668        }
669    }
670
671    fn as_inner(&self) -> &Arc<Client> {
672        self.inner.as_ref().expect("Inner always set")
673    }
674
675    pub fn start_executor(&self) {
676        self.as_inner().start_executor();
677    }
678
679    /// Shutdown the client.
680    pub async fn shutdown(mut self) {
681        self.shutdown_inner().await;
682    }
683
684    async fn shutdown_inner(&mut self) {
685        let Some(inner) = self.inner.take() else {
686            error!(
687                target: LOG_CLIENT,
688                "ClientHandleShared::shutdown called twice"
689            );
690            return;
691        };
692        inner.executor.stop_executor();
693        let db = inner.db.clone();
694        debug!(target: LOG_CLIENT, "Waiting for client task group to shut down");
695        if let Err(err) = inner
696            .task_group
697            .clone()
698            .shutdown_join_all(Some(Duration::from_secs(30)))
699            .await
700        {
701            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Error waiting for client task group to shut down");
702        }
703
704        let client_strong_count = Arc::strong_count(&inner);
705        debug!(target: LOG_CLIENT, "Dropping last handle to Client");
706        // We are sure that no background tasks are running in the client anymore, so we
707        // can drop the (usually) last inner reference.
708        drop(inner);
709
710        if client_strong_count != 1 {
711            debug!(target: LOG_CLIENT, count = client_strong_count - 1, LOG_CLIENT, "External Client references remaining after last handle dropped");
712        }
713
714        let db_strong_count = db.strong_count();
715        if db_strong_count != 1 {
716            debug!(target: LOG_CLIENT, count = db_strong_count - 1, "External DB references remaining after last handle dropped");
717        }
718        trace!(target: LOG_CLIENT, "Dropped last handle to Client");
719    }
720
721    /// Restart the client
722    ///
723    /// Returns false if there are other clones of [`ClientHandle`], or starting
724    /// the client again failed for some reason.
725    ///
726    /// Notably it will re-use the original [`Database`] handle, and not attempt
727    /// to open it again.
728    pub async fn restart(self) -> anyhow::Result<ClientHandle> {
729        let (builder, config, api_secret, root_secret) = {
730            let client = self
731                .inner
732                .as_ref()
733                .ok_or_else(|| format_err!("Already stopped"))?;
734            let builder = ClientBuilder::from_existing(client);
735            let config = client.config().await;
736            let api_secret = client.api_secret.clone();
737            let root_secret = client.root_secret.clone();
738
739            (builder, config, api_secret, root_secret)
740        };
741        self.shutdown().await;
742
743        builder.build(root_secret, config, api_secret, false).await
744    }
745}
746
747impl ops::Deref for ClientHandle {
748    type Target = Client;
749
750    fn deref(&self) -> &Self::Target {
751        self.inner.as_ref().expect("Must have inner client set")
752    }
753}
754
755/// We need a separate drop implementation for `Client` that triggers
756/// `Executor::stop_executor` even though the `Drop` implementation of
757/// `ExecutorInner` should already take care of that. The reason is that as long
758/// as the executor task is active there may be a cycle in the
759/// `Arc<Client>`s such that at least one `Executor` never gets dropped.
760impl Drop for ClientHandle {
761    fn drop(&mut self) {
762        if self.inner.is_none() {
763            return;
764        }
765
766        // We can't use block_on in single-threaded mode or wasm
767        #[cfg(target_family = "wasm")]
768        let can_block = false;
769        #[cfg(not(target_family = "wasm"))]
770        // nosemgrep: ban-raw-block-on
771        let can_block = RuntimeHandle::current().runtime_flavor() != RuntimeFlavor::CurrentThread;
772        if !can_block {
773            let inner = self.inner.take().expect("Must have inner client set");
774            inner.executor.stop_executor();
775            if cfg!(target_family = "wasm") {
776                error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on wasm, call ClientHandle::shutdown manually.");
777            } else {
778                error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on current thread runtime, call ClientHandle::shutdown manually.");
779            }
780            return;
781        }
782
783        debug!(target: LOG_CLIENT, "Shutting down the Client on last handle drop");
784        #[cfg(not(target_family = "wasm"))]
785        runtime::block_in_place(|| {
786            runtime::block_on(self.shutdown_inner());
787        });
788    }
789}
790
791/// List of core api versions supported by the implementation.
792/// Notably `major` version is the one being supported, and corresponding
793/// `minor` version is the one required (for given `major` version).
794const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
795    &[ApiVersion { major: 0, minor: 0 }];
796
797pub type ModuleGlobalContextGen = ContextGen;
798
799/// Resources particular to a module instance
800pub struct ClientModuleInstance<'m, M: ClientModule> {
801    /// Instance id of the module
802    pub id: ModuleInstanceId,
803    /// Module-specific DB
804    pub db: Database,
805    /// Module-specific API
806    pub api: DynModuleApi,
807
808    module: &'m M,
809}
810
811impl<'m, M: ClientModule> ClientModuleInstance<'m, M> {
812    /// Get a reference to the module
813    pub fn inner(&self) -> &'m M {
814        self.module
815    }
816}
817
818impl<'m, M> ops::Deref for ClientModuleInstance<'m, M>
819where
820    M: ClientModule,
821{
822    type Target = M;
823
824    fn deref(&self) -> &Self::Target {
825        self.module
826    }
827}
828
829/// Main client type
830///
831/// A handle and API to interacting with a single federation. End user
832/// applications that want to support interacting with multiple federations at
833/// the same time, will need to instantiate and manage multiple instances of
834/// this struct.
835///
836/// Under the hood it is starting and managing service tasks, state machines,
837/// database and other resources required.
838///
839/// This type is shared externally and internally, and
840/// [`ClientHandle`] is responsible for external lifecycle management
841/// and resource freeing of the [`Client`].
842pub struct Client {
843    config: RwLock<ClientConfig>,
844    api_secret: Option<String>,
845    decoders: ModuleDecoderRegistry,
846    db: Database,
847    federation_id: FederationId,
848    federation_config_meta: BTreeMap<String, String>,
849    primary_module_instance: ModuleInstanceId,
850    modules: ClientModuleRegistry,
851    module_inits: ClientModuleInitRegistry,
852    executor: Executor,
853    api: DynGlobalApi,
854    root_secret: DerivableSecret,
855    operation_log: OperationLog,
856    secp_ctx: Secp256k1<secp256k1::All>,
857    meta_service: Arc<MetaService>,
858    connector: Connector,
859
860    task_group: TaskGroup,
861
862    /// Updates about client recovery progress
863    client_recovery_progress_receiver:
864        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
865
866    /// Internal client sender to wake up log ordering task every time a
867    /// (unuordered) log event is added.
868    log_ordering_wakeup_tx: watch::Sender<()>,
869    /// Receiver for events fired every time (ordered) log event is added.
870    log_event_added_rx: watch::Receiver<()>,
871    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
872}
873
874impl Client {
875    /// Initialize a client builder that can be configured to create a new
876    /// client.
877    pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
878        apply_migrations_core_client(
879            &db,
880            "fedimint-client".to_string(),
881            get_core_client_database_migrations(),
882        )
883        .await?;
884        Ok(ClientBuilder::new(db))
885    }
886
887    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
888        self.api.as_ref()
889    }
890
891    pub fn api_clone(&self) -> DynGlobalApi {
892        self.api.clone()
893    }
894
895    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
896    pub fn task_group(&self) -> &TaskGroup {
897        &self.task_group
898    }
899
900    /// Useful for our CLI tooling, not meant for external use
901    #[doc(hidden)]
902    pub fn executor(&self) -> &Executor {
903        &self.executor
904    }
905
906    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
907        let mut dbtx = db.begin_transaction_nc().await;
908        dbtx.get_value(&ClientConfigKey).await
909    }
910
911    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
912        let mut dbtx = db.begin_transaction_nc().await;
913        dbtx.get_value(&ApiSecretKey).await
914    }
915
916    pub async fn store_encodable_client_secret<T: Encodable>(
917        db: &Database,
918        secret: T,
919    ) -> anyhow::Result<()> {
920        let mut dbtx = db.begin_transaction().await;
921
922        // Don't overwrite an existing secret
923        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
924            bail!("Encoded client secret already exists, cannot overwrite")
925        }
926
927        let encoded_secret = T::consensus_encode_to_vec(&secret);
928        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
929            .await;
930        dbtx.commit_tx().await;
931        Ok(())
932    }
933
934    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
935        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
936            bail!("Encoded client secret not present in DB")
937        };
938
939        Ok(secret)
940    }
941    pub async fn load_decodable_client_secret_opt<T: Decodable>(
942        db: &Database,
943    ) -> anyhow::Result<Option<T>> {
944        let mut dbtx = db.begin_transaction_nc().await;
945
946        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
947
948        Ok(match client_secret {
949            Some(client_secret) => Some(
950                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
951                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
952            ),
953            None => None,
954        })
955    }
956
957    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
958        let client_secret =
959            if let Ok(secret) = Self::load_decodable_client_secret::<[u8; 64]>(db).await {
960                secret
961            } else {
962                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
963                Self::store_encodable_client_secret(db, secret)
964                    .await
965                    .expect("Storing client secret must work");
966                secret
967            };
968        Ok(client_secret)
969    }
970
971    pub async fn is_initialized(db: &Database) -> bool {
972        Self::get_config_from_db(db).await.is_some()
973    }
974
975    pub fn start_executor(self: &Arc<Self>) {
976        debug!(
977            target: LOG_CLIENT,
978            "Starting fedimint client executor (version: {})",
979            fedimint_build_code_version_env!()
980        );
981        self.executor.start_executor(self.context_gen());
982    }
983
984    pub fn federation_id(&self) -> FederationId {
985        self.federation_id
986    }
987
988    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
989        let client_inner = Arc::downgrade(self);
990        Arc::new(move |module_instance, operation| {
991            ModuleGlobalClientContext {
992                client: client_inner
993                    .clone()
994                    .upgrade()
995                    .expect("ModuleGlobalContextGen called after client was dropped"),
996                module_instance_id: module_instance,
997                operation,
998            }
999            .into()
1000        })
1001    }
1002
1003    pub async fn config(&self) -> ClientConfig {
1004        self.config.read().await.clone()
1005    }
1006
1007    pub fn api_secret(&self) -> &Option<String> {
1008        &self.api_secret
1009    }
1010
1011    pub fn decoders(&self) -> &ModuleDecoderRegistry {
1012        &self.decoders
1013    }
1014
1015    /// Returns a reference to the module, panics if not found
1016    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1017        self.try_get_module(instance)
1018            .expect("Module instance not found")
1019    }
1020
1021    fn try_get_module(
1022        &self,
1023        instance: ModuleInstanceId,
1024    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
1025        Some(self.modules.get(instance)?.as_ref())
1026    }
1027
1028    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
1029        self.modules.get(instance).is_some()
1030    }
1031
1032    /// Returns the input amount and output amount of a transaction
1033    ///
1034    /// # Panics
1035    /// If any of the input or output versions in the transaction builder are
1036    /// unknown by the respective module.
1037    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
1038        // FIXME: prevent overflows, currently not suitable for untrusted input
1039        let mut in_amount = Amount::ZERO;
1040        let mut out_amount = Amount::ZERO;
1041        let mut fee_amount = Amount::ZERO;
1042
1043        for input in builder.inputs() {
1044            let module = self.get_module(input.input.module_instance_id());
1045
1046            let item_fee = module.input_fee(input.amount, &input.input).expect(
1047                "We only build transactions with input versions that are supported by the module",
1048            );
1049
1050            in_amount += input.amount;
1051            fee_amount += item_fee;
1052        }
1053
1054        for output in builder.outputs() {
1055            let module = self.get_module(output.output.module_instance_id());
1056
1057            let item_fee = module.output_fee(output.amount, &output.output).expect(
1058                "We only build transactions with output versions that are supported by the module",
1059            );
1060
1061            out_amount += output.amount;
1062            fee_amount += item_fee;
1063        }
1064
1065        (in_amount, out_amount + fee_amount)
1066    }
1067
1068    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1069        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
1070    }
1071
1072    /// Get metadata value from the federation config itself
1073    pub fn get_config_meta(&self, key: &str) -> Option<String> {
1074        self.federation_config_meta.get(key).cloned()
1075    }
1076
1077    fn root_secret(&self) -> DerivableSecret {
1078        self.root_secret.clone()
1079    }
1080
1081    pub async fn add_state_machines(
1082        &self,
1083        dbtx: &mut DatabaseTransaction<'_>,
1084        states: Vec<DynState>,
1085    ) -> AddStateMachinesResult {
1086        self.executor.add_state_machines_dbtx(dbtx, states).await
1087    }
1088
1089    // TODO: implement as part of [`OperationLog`]
1090    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
1091        let active_states = self.executor.get_active_states().await;
1092        let mut active_operations = HashSet::with_capacity(active_states.len());
1093        let mut dbtx = self.db().begin_transaction_nc().await;
1094        for (state, _) in active_states {
1095            let operation_id = state.operation_id();
1096            if dbtx
1097                .get_value(&OperationLogKey { operation_id })
1098                .await
1099                .is_some()
1100            {
1101                active_operations.insert(operation_id);
1102            }
1103        }
1104        active_operations
1105    }
1106
1107    pub fn operation_log(&self) -> &OperationLog {
1108        &self.operation_log
1109    }
1110
1111    /// Get the meta manager to read meta fields.
1112    pub fn meta_service(&self) -> &Arc<MetaService> {
1113        &self.meta_service
1114    }
1115
1116    /// Get the meta manager to read meta fields.
1117    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
1118        let meta_service = self.meta_service();
1119        let ts = meta_service
1120            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
1121            .await
1122            .and_then(|v| v.value)?;
1123        Some(UNIX_EPOCH + Duration::from_secs(ts))
1124    }
1125
1126    /// Adds funding to a transaction or removes over-funding via change.
1127    async fn finalize_transaction(
1128        &self,
1129        dbtx: &mut DatabaseTransaction<'_>,
1130        operation_id: OperationId,
1131        mut partial_transaction: TransactionBuilder,
1132    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
1133        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1134
1135        let (added_input_bundle, change_outputs) = self
1136            .primary_module()
1137            .create_final_inputs_and_outputs(
1138                self.primary_module_instance,
1139                dbtx,
1140                operation_id,
1141                input_amount,
1142                output_amount,
1143            )
1144            .await?;
1145
1146        // This is the range of  outputs that will be added to the transaction
1147        // in order to balance it. Notice that it may stay empty in case the transaction
1148        // is already balanced.
1149        let change_range = Range {
1150            start: partial_transaction.outputs().count() as u64,
1151            end: (partial_transaction.outputs().count() + change_outputs.outputs.len()) as u64,
1152        };
1153
1154        partial_transaction = partial_transaction
1155            .with_inputs(added_input_bundle)
1156            .with_outputs(change_outputs);
1157
1158        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1159
1160        assert!(input_amount >= output_amount, "Transaction is underfunded");
1161
1162        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
1163
1164        Ok((tx, states, change_range))
1165    }
1166
1167    /// Add funding and/or change to the transaction builder as needed, finalize
1168    /// the transaction and submit it to the federation.
1169    ///
1170    /// ## Errors
1171    /// The function will return an error if the operation with given ID already
1172    /// exists.
1173    ///
1174    /// ## Panics
1175    /// The function will panic if the database transaction collides with
1176    /// other and fails with others too often, this should not happen except for
1177    /// excessively concurrent scenarios.
1178    pub async fn finalize_and_submit_transaction<F, M>(
1179        &self,
1180        operation_id: OperationId,
1181        operation_type: &str,
1182        operation_meta_gen: F,
1183        tx_builder: TransactionBuilder,
1184    ) -> anyhow::Result<OutPointRange>
1185    where
1186        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
1187        M: serde::Serialize + MaybeSend,
1188    {
1189        let operation_type = operation_type.to_owned();
1190
1191        let autocommit_res = self
1192            .db
1193            .autocommit(
1194                |dbtx, _| {
1195                    let operation_type = operation_type.clone();
1196                    let tx_builder = tx_builder.clone();
1197                    let operation_meta_gen = operation_meta_gen.clone();
1198                    Box::pin(async move {
1199                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
1200                            bail!("There already exists an operation with id {operation_id:?}")
1201                        }
1202
1203                        let out_point_range = self
1204                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
1205                            .await?;
1206
1207                        self.operation_log()
1208                            .add_operation_log_entry(
1209                                dbtx,
1210                                operation_id,
1211                                &operation_type,
1212                                operation_meta_gen(out_point_range),
1213                            )
1214                            .await;
1215
1216                        Ok(out_point_range)
1217                    })
1218                },
1219                Some(100), // TODO: handle what happens after 100 retries
1220            )
1221            .await;
1222
1223        match autocommit_res {
1224            Ok(txid) => Ok(txid),
1225            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
1226            Err(AutocommitError::CommitFailed {
1227                attempts,
1228                last_error,
1229            }) => panic!(
1230                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
1231            ),
1232        }
1233    }
1234
1235    async fn finalize_and_submit_transaction_inner(
1236        &self,
1237        dbtx: &mut DatabaseTransaction<'_>,
1238        operation_id: OperationId,
1239        tx_builder: TransactionBuilder,
1240    ) -> anyhow::Result<OutPointRange> {
1241        let (transaction, mut states, change_range) = self
1242            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
1243            .await?;
1244
1245        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
1246            let inputs = transaction
1247                .inputs
1248                .iter()
1249                .map(DynInput::module_instance_id)
1250                .collect::<Vec<_>>();
1251            let outputs = transaction
1252                .outputs
1253                .iter()
1254                .map(DynOutput::module_instance_id)
1255                .collect::<Vec<_>>();
1256            warn!(
1257                target: LOG_CLIENT_NET_API,
1258                size=%transaction.consensus_encode_to_vec().len(),
1259                ?inputs,
1260                ?outputs,
1261                "Transaction too large",
1262            );
1263            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
1264            bail!(
1265                "The generated transaction would be rejected by the federation for being too large."
1266            );
1267        }
1268
1269        let txid = transaction.tx_hash();
1270
1271        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
1272
1273        let tx_submission_sm = DynState::from_typed(
1274            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1275            TxSubmissionStatesSM {
1276                operation_id,
1277                state: TxSubmissionStates::Created(transaction),
1278            },
1279        );
1280        states.push(tx_submission_sm);
1281
1282        self.executor.add_state_machines_dbtx(dbtx, states).await?;
1283
1284        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
1285            .await;
1286
1287        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
1288    }
1289
1290    async fn transaction_update_stream(
1291        &self,
1292        operation_id: OperationId,
1293    ) -> BoxStream<'static, TxSubmissionStatesSM> {
1294        self.executor
1295            .notifier()
1296            .module_notifier::<TxSubmissionStatesSM>(TRANSACTION_SUBMISSION_MODULE_INSTANCE)
1297            .subscribe(operation_id)
1298            .await
1299    }
1300
1301    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
1302        let mut dbtx = self.db().begin_transaction_nc().await;
1303
1304        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
1305    }
1306
1307    pub async fn operation_exists_dbtx(
1308        dbtx: &mut DatabaseTransaction<'_>,
1309        operation_id: OperationId,
1310    ) -> bool {
1311        let active_state_exists = dbtx
1312            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1313            .await
1314            .next()
1315            .await
1316            .is_some();
1317
1318        let inactive_state_exists = dbtx
1319            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
1320            .await
1321            .next()
1322            .await
1323            .is_some();
1324
1325        active_state_exists || inactive_state_exists
1326    }
1327
1328    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
1329        self.db
1330            .begin_transaction_nc()
1331            .await
1332            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1333            .await
1334            .next()
1335            .await
1336            .is_some()
1337    }
1338
1339    /// Waits for an output from the primary module to reach its final
1340    /// state.
1341    pub async fn await_primary_module_output(
1342        &self,
1343        operation_id: OperationId,
1344        out_point: OutPoint,
1345    ) -> anyhow::Result<()> {
1346        self.primary_module()
1347            .await_primary_module_output(operation_id, out_point)
1348            .await
1349    }
1350
1351    /// Returns a reference to a typed module client instance by kind
1352    pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
1353        let module_kind = M::kind();
1354        let id = self
1355            .get_first_instance(&module_kind)
1356            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
1357        let module: &M = self
1358            .try_get_module(id)
1359            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
1360            .as_any()
1361            .downcast_ref::<M>()
1362            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
1363        let (db, _) = self.db().with_prefix_module_id(id);
1364        Ok(ClientModuleInstance {
1365            id,
1366            db,
1367            api: self.api().with_module(id),
1368            module,
1369        })
1370    }
1371
1372    pub fn get_module_client_dyn(
1373        &self,
1374        instance_id: ModuleInstanceId,
1375    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
1376        self.try_get_module(instance_id)
1377            .ok_or(anyhow!("Unknown module instance {}", instance_id))
1378    }
1379
1380    pub fn db(&self) -> &Database {
1381        &self.db
1382    }
1383
1384    /// Returns a stream of transaction updates for the given operation id that
1385    /// can later be used to watch for a specific transaction being accepted.
1386    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1387        TransactionUpdates {
1388            update_stream: self.transaction_update_stream(operation_id).await,
1389        }
1390    }
1391
1392    /// Returns the instance id of the first module of the given kind. The
1393    /// primary module will always be returned before any other modules (which
1394    /// themselves are ordered by their instance ID).
1395    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1396        if self
1397            .modules
1398            .get_with_kind(self.primary_module_instance)
1399            .is_some_and(|(kind, _)| kind == module_kind)
1400        {
1401            return Some(self.primary_module_instance);
1402        }
1403
1404        self.modules
1405            .iter_modules()
1406            .find(|(_, kind, _module)| *kind == module_kind)
1407            .map(|(instance_id, _, _)| instance_id)
1408    }
1409
1410    /// Returns the data from which the client's root secret is derived (e.g.
1411    /// BIP39 seed phrase struct).
1412    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1413        get_decoded_client_secret::<T>(self.db()).await
1414    }
1415
1416    /// Waits for outputs from the primary module to reach its final
1417    /// state.
1418    pub async fn await_primary_module_outputs(
1419        &self,
1420        operation_id: OperationId,
1421        outputs: Vec<OutPoint>,
1422    ) -> anyhow::Result<()> {
1423        for out_point in outputs {
1424            self.await_primary_module_output(operation_id, out_point)
1425                .await?;
1426        }
1427
1428        Ok(())
1429    }
1430
1431    /// Returns the config of the client in JSON format.
1432    ///
1433    /// Compared to the consensus module format where module configs are binary
1434    /// encoded this format cannot be cryptographically verified but is easier
1435    /// to consume and to some degree human-readable.
1436    pub async fn get_config_json(&self) -> JsonClientConfig {
1437        self.config().await.to_json()
1438    }
1439
1440    /// Get the primary module
1441    pub fn primary_module(&self) -> &DynClientModule {
1442        self.modules
1443            .get(self.primary_module_instance)
1444            .expect("primary module must be present")
1445    }
1446
1447    /// Balance available to the client for spending
1448    pub async fn get_balance(&self) -> Amount {
1449        self.primary_module()
1450            .get_balance(
1451                self.primary_module_instance,
1452                &mut self.db().begin_transaction_nc().await,
1453            )
1454            .await
1455    }
1456
1457    /// Returns a stream that yields the current client balance every time it
1458    /// changes.
1459    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
1460        let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
1461        let initial_balance = self.get_balance().await;
1462        let db = self.db().clone();
1463        let primary_module = self.primary_module().clone();
1464        let primary_module_instance = self.primary_module_instance;
1465
1466        Box::pin(stream! {
1467            yield initial_balance;
1468            let mut prev_balance = initial_balance;
1469            while let Some(()) = balance_changes.next().await {
1470                let mut dbtx = db.begin_transaction_nc().await;
1471                let balance = primary_module
1472                    .get_balance(primary_module_instance, &mut dbtx)
1473                    .await;
1474
1475                // Deduplicate in case modules cannot always tell if the balance actually changed
1476                if balance != prev_balance {
1477                    prev_balance = balance;
1478                    yield balance;
1479                }
1480            }
1481        })
1482    }
1483
1484    /// Query the federation for API version support and then calculate
1485    /// the best API version to use (supported by most guardians).
1486    pub async fn refresh_peers_api_versions(
1487        num_peers: NumPeers,
1488        api: DynGlobalApi,
1489        db: Database,
1490        num_responses_sender: watch::Sender<usize>,
1491    ) {
1492        // Make a single request to a peer after a delay
1493        //
1494        // The delay is here to unify the type of a future both for initial request and
1495        // possible retries.
1496        async fn make_request(
1497            delay: Duration,
1498            peer_id: PeerId,
1499            api: &DynGlobalApi,
1500        ) -> (
1501            PeerId,
1502            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
1503        ) {
1504            runtime::sleep(delay).await;
1505            (
1506                peer_id,
1507                api.request_single_peer::<SupportedApiVersionsSummary>(
1508                    VERSION_ENDPOINT.to_owned(),
1509                    ApiRequestErased::default(),
1510                    peer_id,
1511                )
1512                .await,
1513            )
1514        }
1515
1516        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1517        // and make a single async db write operation, it should be OK.
1518        let mut requests = FuturesUnordered::new();
1519
1520        for peer_id in num_peers.peer_ids() {
1521            requests.push(make_request(Duration::ZERO, peer_id, &api));
1522        }
1523
1524        let mut num_responses = 0;
1525
1526        while let Some((peer_id, response)) = requests.next().await {
1527            match response {
1528                Err(err) => {
1529                    if db
1530                        .begin_transaction_nc()
1531                        .await
1532                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1533                        .await
1534                        .is_some()
1535                    {
1536                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
1537                    } else {
1538                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
1539                        requests.push(make_request(Duration::from_secs(15), peer_id, &api));
1540                    }
1541                }
1542                Ok(o) => {
1543                    // Save the response to the database right away, just to
1544                    // not lose it
1545                    let mut dbtx = db.begin_transaction().await;
1546                    dbtx.insert_entry(
1547                        &PeerLastApiVersionsSummaryKey(peer_id),
1548                        &PeerLastApiVersionsSummary(o),
1549                    )
1550                    .await;
1551                    dbtx.commit_tx().await;
1552                    num_responses += 1;
1553                    // ignore errors: we don't care if anyone is still listening
1554                    let _ = num_responses_sender.send(num_responses);
1555                }
1556            }
1557        }
1558    }
1559
1560    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1561    pub fn supported_api_versions_summary_static(
1562        config: &ClientConfig,
1563        client_module_init: &ClientModuleInitRegistry,
1564    ) -> SupportedApiVersionsSummary {
1565        SupportedApiVersionsSummary {
1566            core: SupportedCoreApiVersions {
1567                core_consensus: config.global.consensus_version,
1568                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1569                    .expect("must not have conflicting versions"),
1570            },
1571            modules: config
1572                .modules
1573                .iter()
1574                .filter_map(|(&module_instance_id, module_config)| {
1575                    client_module_init
1576                        .get(module_config.kind())
1577                        .map(|module_init| {
1578                            (
1579                                module_instance_id,
1580                                SupportedModuleApiVersions {
1581                                    core_consensus: config.global.consensus_version,
1582                                    module_consensus: module_config.version,
1583                                    api: module_init.supported_api_versions(),
1584                                },
1585                            )
1586                        })
1587                })
1588                .collect(),
1589        }
1590    }
1591
1592    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1593        Self::load_and_refresh_common_api_version_static(
1594            &self.config().await,
1595            &self.module_inits,
1596            &self.api,
1597            &self.db,
1598            &self.task_group,
1599        )
1600        .await
1601    }
1602
1603    /// Load the common api versions to use from cache and start a background
1604    /// process to refresh them.
1605    ///
1606    /// This is a compromise, so we not have to wait for version discovery to
1607    /// complete every time a [`Client`] is being built.
1608    async fn load_and_refresh_common_api_version_static(
1609        config: &ClientConfig,
1610        module_init: &ClientModuleInitRegistry,
1611        api: &DynGlobalApi,
1612        db: &Database,
1613        task_group: &TaskGroup,
1614    ) -> anyhow::Result<ApiVersionSet> {
1615        if let Some(v) = db
1616            .begin_transaction_nc()
1617            .await
1618            .get_value(&CachedApiVersionSetKey)
1619            .await
1620        {
1621            debug!(
1622                target: LOG_CLIENT,
1623                "Found existing cached common api versions"
1624            );
1625            let config = config.clone();
1626            let client_module_init = module_init.clone();
1627            let api = api.clone();
1628            let db = db.clone();
1629            let task_group = task_group.clone();
1630            // Separate task group, because we actually don't want to be waiting for this to
1631            // finish, and it's just best effort.
1632            task_group
1633                .clone()
1634                .spawn_cancellable("refresh_common_api_version_static", async move {
1635                    if let Err(error) = Self::refresh_common_api_version_static(
1636                        &config,
1637                        &client_module_init,
1638                        &api,
1639                        &db,
1640                        task_group,
1641                    )
1642                    .await
1643                    {
1644                        warn!(
1645                            target: LOG_CLIENT,
1646                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1647                        );
1648                    }
1649                });
1650
1651            return Ok(v.0);
1652        }
1653
1654        debug!(
1655            target: LOG_CLIENT,
1656            "No existing cached common api versions found, waiting for initial discovery"
1657        );
1658        Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
1659            .await
1660    }
1661
1662    async fn refresh_common_api_version_static(
1663        config: &ClientConfig,
1664        client_module_init: &ClientModuleInitRegistry,
1665        api: &DynGlobalApi,
1666        db: &Database,
1667        task_group: TaskGroup,
1668    ) -> anyhow::Result<ApiVersionSet> {
1669        debug!(
1670            target: LOG_CLIENT,
1671            "Refreshing common api versions"
1672        );
1673
1674        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1675        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1676
1677        task_group.spawn_cancellable("refresh peers api versions", {
1678            Client::refresh_peers_api_versions(
1679                num_peers,
1680                api.clone(),
1681                db.clone(),
1682                num_responses_sender,
1683            )
1684        });
1685
1686        // Wait at most 15 seconds before calculating a set of common api versions to
1687        // use. Note that all peers individual responses from previous attempts
1688        // are still being used, and requests, or even retries for response of
1689        // peers are not actually cancelled, as they are happening on a separate
1690        // task. This is all just to bound the time user can be waiting
1691        // for the join operation to finish, at the risk of picking wrong version in
1692        // very rare circumstances.
1693        let _: Result<_, Elapsed> = runtime::timeout(
1694            Duration::from_secs(15),
1695            num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1696        )
1697        .await;
1698
1699        let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1700
1701        let common_api_versions = discover_common_api_versions_set(
1702            &Self::supported_api_versions_summary_static(config, client_module_init),
1703            &peer_api_version_sets,
1704        )?;
1705
1706        debug!(
1707            target: LOG_CLIENT,
1708            value = ?common_api_versions,
1709            "Updating the cached common api versions"
1710        );
1711        let mut dbtx = db.begin_transaction().await;
1712        let _ = dbtx
1713            .insert_entry(
1714                &CachedApiVersionSetKey,
1715                &CachedApiVersionSet(common_api_versions.clone()),
1716            )
1717            .await;
1718
1719        dbtx.commit_tx().await;
1720
1721        Ok(common_api_versions)
1722    }
1723
1724    /// Get the client [`Metadata`]
1725    pub async fn get_metadata(&self) -> Metadata {
1726        self.db
1727            .begin_transaction_nc()
1728            .await
1729            .get_value(&ClientMetadataKey)
1730            .await
1731            .unwrap_or_else(|| {
1732                warn!(
1733                    target: LOG_CLIENT,
1734                    "Missing existing metadata. This key should have been set on Client init"
1735                );
1736                Metadata::empty()
1737            })
1738    }
1739
1740    /// Set the client [`Metadata`]
1741    pub async fn set_metadata(&self, metadata: &Metadata) {
1742        self.db
1743            .autocommit::<_, _, anyhow::Error>(
1744                |dbtx, _| {
1745                    Box::pin(async {
1746                        Self::set_metadata_dbtx(dbtx, metadata).await;
1747                        Ok(())
1748                    })
1749                },
1750                None,
1751            )
1752            .await
1753            .expect("Failed to autocommit metadata");
1754    }
1755
1756    pub fn has_pending_recoveries(&self) -> bool {
1757        !self
1758            .client_recovery_progress_receiver
1759            .borrow()
1760            .iter()
1761            .all(|(_id, progress)| progress.is_done())
1762    }
1763
1764    /// Wait for all module recoveries to finish
1765    ///
1766    /// This will block until the recovery task is done with recoveries.
1767    /// Returns success if all recovery tasks are complete (success case),
1768    /// or an error if some modules could not complete the recovery at the time.
1769    ///
1770    /// A bit of a heavy approach.
1771    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1772        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1773        recovery_receiver
1774            .wait_for(|in_progress| {
1775                in_progress
1776                    .iter()
1777                    .all(|(_id, progress)| progress.is_done())
1778            })
1779            .await
1780            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1781
1782        Ok(())
1783    }
1784
1785    /// Subscribe to recover progress for all the modules.
1786    ///
1787    /// This stream can contain duplicate progress for a module.
1788    /// Don't use this stream for detecting completion of recovery.
1789    pub fn subscribe_to_recovery_progress(
1790        &self,
1791    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> {
1792        WatchStream::new(self.client_recovery_progress_receiver.clone())
1793            .flat_map(futures::stream::iter)
1794    }
1795
1796    pub async fn wait_for_module_kind_recovery(
1797        &self,
1798        module_kind: ModuleKind,
1799    ) -> anyhow::Result<()> {
1800        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1801        let config = self.config().await;
1802        recovery_receiver
1803            .wait_for(|in_progress| {
1804                !in_progress
1805                    .iter()
1806                    .filter(|(module_instance_id, _progress)| {
1807                        config.modules[module_instance_id].kind == module_kind
1808                    })
1809                    .any(|(_id, progress)| !progress.is_done())
1810            })
1811            .await
1812            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1813
1814        Ok(())
1815    }
1816
1817    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1818        loop {
1819            if self.executor.get_active_states().await.is_empty() {
1820                break;
1821            }
1822            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1823        }
1824        Ok(())
1825    }
1826
1827    /// Set the client [`Metadata`]
1828    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1829        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1830    }
1831
1832    fn spawn_module_recoveries_task(
1833        &self,
1834        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1835        module_recoveries: BTreeMap<
1836            ModuleInstanceId,
1837            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1838        >,
1839        module_recovery_progress_receivers: BTreeMap<
1840            ModuleInstanceId,
1841            watch::Receiver<RecoveryProgress>,
1842        >,
1843    ) {
1844        let db = self.db.clone();
1845        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1846        self.task_group
1847            .spawn("module recoveries", |_task_handle| async {
1848                Self::run_module_recoveries_task(
1849                    db,
1850                    log_ordering_wakeup_tx,
1851                    recovery_sender,
1852                    module_recoveries,
1853                    module_recovery_progress_receivers,
1854                )
1855                .await;
1856            });
1857    }
1858
1859    async fn run_module_recoveries_task(
1860        db: Database,
1861        log_ordering_wakeup_tx: watch::Sender<()>,
1862        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1863        module_recoveries: BTreeMap<
1864            ModuleInstanceId,
1865            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1866        >,
1867        module_recovery_progress_receivers: BTreeMap<
1868            ModuleInstanceId,
1869            watch::Receiver<RecoveryProgress>,
1870        >,
1871    ) {
1872        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1873        let mut completed_stream = Vec::new();
1874        let progress_stream = futures::stream::FuturesUnordered::new();
1875
1876        for (module_instance_id, f) in module_recoveries {
1877            completed_stream.push(futures::stream::once(Box::pin(async move {
1878                match f.await {
1879                    Ok(()) => (module_instance_id, None),
1880                    Err(err) => {
1881                        warn!(
1882                            target: LOG_CLIENT,
1883                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1884                        );
1885                        // a module recovery that failed reports and error and
1886                        // just never finishes, so we don't need a separate state
1887                        // for it
1888                        futures::future::pending::<()>().await;
1889                        unreachable!()
1890                    }
1891                }
1892            })));
1893        }
1894
1895        for (module_instance_id, rx) in module_recovery_progress_receivers {
1896            progress_stream.push(
1897                tokio_stream::wrappers::WatchStream::new(rx)
1898                    .fuse()
1899                    .map(move |progress| (module_instance_id, Some(progress))),
1900            );
1901        }
1902
1903        let mut futures = futures::stream::select(
1904            futures::stream::select_all(progress_stream),
1905            futures::stream::select_all(completed_stream),
1906        );
1907
1908        while let Some((module_instance_id, progress)) = futures.next().await {
1909            let mut dbtx = db.begin_transaction().await;
1910
1911            let prev_progress = *recovery_sender
1912                .borrow()
1913                .get(&module_instance_id)
1914                .expect("existing progress must be present");
1915
1916            let progress = if prev_progress.is_done() {
1917                // since updates might be out of order, once done, stick with it
1918                prev_progress
1919            } else if let Some(progress) = progress {
1920                progress
1921            } else {
1922                prev_progress.to_complete()
1923            };
1924
1925            if !prev_progress.is_done() && progress.is_done() {
1926                info!(
1927                    target: LOG_CLIENT,
1928                    module_instance_id,
1929                    progress = format!("{}/{}", progress.complete, progress.total),
1930                    "Recovery complete"
1931                );
1932                dbtx.log_event(
1933                    log_ordering_wakeup_tx.clone(),
1934                    None,
1935                    ModuleRecoveryCompleted {
1936                        module_id: module_instance_id,
1937                    },
1938                )
1939                .await;
1940            } else {
1941                info!(
1942                    target: LOG_CLIENT,
1943                    module_instance_id,
1944                    progress = format!("{}/{}", progress.complete, progress.total),
1945                    "Recovery progress"
1946                );
1947            }
1948
1949            dbtx.insert_entry(
1950                &ClientModuleRecovery { module_instance_id },
1951                &ClientModuleRecoveryState { progress },
1952            )
1953            .await;
1954            dbtx.commit_tx().await;
1955
1956            recovery_sender.send_modify(|v| {
1957                v.insert(module_instance_id, progress);
1958            });
1959        }
1960        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1961    }
1962
1963    async fn load_peers_last_api_versions(
1964        db: &Database,
1965        num_peers: NumPeers,
1966    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1967        let mut peer_api_version_sets = BTreeMap::new();
1968
1969        let mut dbtx = db.begin_transaction_nc().await;
1970        for peer_id in num_peers.peer_ids() {
1971            if let Some(v) = dbtx
1972                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1973                .await
1974            {
1975                peer_api_version_sets.insert(peer_id, v.0);
1976            }
1977        }
1978        drop(dbtx);
1979        peer_api_version_sets
1980    }
1981
1982    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1983    /// only the announcements and doesn't use the config as fallback.
1984    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1985        self.db()
1986            .begin_transaction_nc()
1987            .await
1988            .find_by_prefix(&ApiAnnouncementPrefix)
1989            .await
1990            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1991            .collect()
1992            .await
1993    }
1994
1995    /// Returns a list of guardian API URLs
1996    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1997        get_api_urls(&self.db, &self.config().await).await
1998    }
1999
2000    /// Create an invite code with the api endpoint of the given peer which can
2001    /// be used to download this client config
2002    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2003        self.get_peer_urls()
2004            .await
2005            .into_iter()
2006            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
2007            .map(|peer_url| {
2008                InviteCode::new(
2009                    peer_url.clone(),
2010                    peer,
2011                    self.federation_id(),
2012                    self.api_secret.clone(),
2013                )
2014            })
2015    }
2016
2017    /// Blocks till the client has synced the guardian public key set
2018    /// (introduced in version 0.4) and returns it. Once it has been fetched
2019    /// once this function is guaranteed to return immediately.
2020    pub async fn get_guardian_public_keys_blocking(
2021        &self,
2022    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
2023        self.db.autocommit(|dbtx, _| Box::pin(async move {
2024            let config = self.config().await;
2025
2026            let guardian_pub_keys = if let Some(guardian_pub_keys) = config.global.broadcast_public_keys {guardian_pub_keys}else{
2027                let fetched_config = retry(
2028                    "Fetching guardian public keys",
2029                    backoff_util::background_backoff(),
2030                    || async {
2031                        Ok(self.api.request_current_consensus::<ClientConfig>(
2032                            CLIENT_CONFIG_ENDPOINT.to_owned(),
2033                            ApiRequestErased::default(),
2034                        ).await?)
2035                    },
2036                )
2037                .await
2038                .expect("Will never return on error");
2039
2040                let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
2041                    warn!(
2042                        target: LOG_CLIENT,
2043                        "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
2044                    );
2045                    pending::<()>().await;
2046                    unreachable!("Pending will never return");
2047                };
2048
2049                let new_config = ClientConfig {
2050                    global: GlobalClientConfig {
2051                        broadcast_public_keys: Some(guardian_pub_keys.clone()),
2052                        ..config.global
2053                    },
2054                    modules: config.modules,
2055                };
2056
2057                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
2058                *(self.config.write().await) = new_config;
2059                guardian_pub_keys
2060            };
2061
2062            Result::<_, ()>::Ok(guardian_pub_keys)
2063        }), None).await.expect("Will retry forever")
2064    }
2065
2066    pub fn handle_global_rpc(
2067        &self,
2068        method: String,
2069        params: serde_json::Value,
2070    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
2071        Box::pin(try_stream! {
2072            match method.as_str() {
2073                "get_balance" => {
2074                    let balance = self.get_balance().await;
2075                    yield serde_json::to_value(balance)?;
2076                }
2077                "subscribe_balance_changes" => {
2078                    let mut stream = self.subscribe_balance_changes().await;
2079                    while let Some(balance) = stream.next().await {
2080                        yield serde_json::to_value(balance)?;
2081                    }
2082                }
2083                "get_config" => {
2084                    let config = self.config().await;
2085                    yield serde_json::to_value(config)?;
2086                }
2087                "get_federation_id" => {
2088                    let federation_id = self.federation_id();
2089                    yield serde_json::to_value(federation_id)?;
2090                }
2091                "get_invite_code" => {
2092                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
2093                    let invite_code = self.invite_code(req.peer).await;
2094                    yield serde_json::to_value(invite_code)?;
2095                }
2096                "list_operations" => {
2097                    // TODO: support pagination
2098                    let operations = self.operation_log().paginate_operations_rev(usize::MAX, None).await;
2099                    yield serde_json::to_value(operations)?;
2100                }
2101                "has_pending_recoveries" => {
2102                    let has_pending = self.has_pending_recoveries();
2103                    yield serde_json::to_value(has_pending)?;
2104                }
2105                "wait_for_all_recoveries" => {
2106                    self.wait_for_all_recoveries().await?;
2107                    yield serde_json::Value::Null;
2108                }
2109                "subscribe_to_recovery_progress" => {
2110                    let mut stream = self.subscribe_to_recovery_progress();
2111                    while let Some((module_id, progress)) = stream.next().await {
2112                        yield serde_json::json!({
2113                            "module_id": module_id,
2114                            "progress": progress
2115                        });
2116                    }
2117                }
2118                _ => {
2119                    Err(anyhow::format_err!("Unknown method: {}", method))?;
2120                    unreachable!()
2121                },
2122            }
2123        })
2124    }
2125
2126    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
2127    where
2128        E: Event + Send,
2129    {
2130        let mut dbtx = self.db.begin_transaction().await;
2131        self.log_event_dbtx(&mut dbtx, module_id, event).await;
2132        dbtx.commit_tx().await;
2133    }
2134
2135    pub async fn log_event_dbtx<E, Cap>(
2136        &self,
2137        dbtx: &mut DatabaseTransaction<'_, Cap>,
2138        module_id: Option<ModuleInstanceId>,
2139        event: E,
2140    ) where
2141        E: Event + Send,
2142        Cap: Send,
2143    {
2144        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
2145            .await;
2146    }
2147
2148    pub async fn log_event_raw_dbtx<Cap>(
2149        &self,
2150        dbtx: &mut DatabaseTransaction<'_, Cap>,
2151        kind: EventKind,
2152        module: Option<(ModuleKind, ModuleInstanceId)>,
2153        payload: Vec<u8>,
2154        persist: bool,
2155    ) where
2156        Cap: Send,
2157    {
2158        let module_id = module.as_ref().map(|m| m.1);
2159        let module_kind = module.map(|m| m.0);
2160        dbtx.log_event_raw(
2161            self.log_ordering_wakeup_tx.clone(),
2162            kind,
2163            module_kind,
2164            module_id,
2165            payload,
2166            persist,
2167        )
2168        .await;
2169    }
2170
2171    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
2172    where
2173        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
2174        K: DatabaseRecord<Value = EventLogId>,
2175        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2176        R: Future<Output = anyhow::Result<()>>,
2177    {
2178        fedimint_eventlog::handle_events(
2179            self.db.clone(),
2180            pos_key,
2181            self.log_event_added_rx.clone(),
2182            call_fn,
2183        )
2184        .await
2185    }
2186
2187    pub async fn get_event_log(
2188        &self,
2189        pos: Option<EventLogId>,
2190        limit: u64,
2191    ) -> Vec<(
2192        EventLogId,
2193        EventKind,
2194        Option<(ModuleKind, ModuleInstanceId)>,
2195        u64,
2196        serde_json::Value,
2197    )> {
2198        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2199            .await
2200    }
2201
2202    pub async fn get_event_log_dbtx<Cap>(
2203        &self,
2204        dbtx: &mut DatabaseTransaction<'_, Cap>,
2205        pos: Option<EventLogId>,
2206        limit: u64,
2207    ) -> Vec<(
2208        EventLogId,
2209        EventKind,
2210        Option<(ModuleKind, ModuleInstanceId)>,
2211        u64,
2212        serde_json::Value,
2213    )>
2214    where
2215        Cap: Send,
2216    {
2217        dbtx.get_event_log(pos, limit).await
2218    }
2219
2220    /// Register to receiver all new transient (unpersisted) events
2221    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2222        self.log_event_added_transient_tx.subscribe()
2223    }
2224}
2225
2226#[apply(async_trait_maybe_send!)]
2227impl ClientContextIface for Client {
2228    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2229        Client::get_module(self, instance)
2230    }
2231
2232    fn api_clone(&self) -> DynGlobalApi {
2233        Client::api_clone(self)
2234    }
2235    fn decoders(&self) -> &ModuleDecoderRegistry {
2236        Client::decoders(self)
2237    }
2238
2239    async fn finalize_and_submit_transaction(
2240        &self,
2241        operation_id: OperationId,
2242        operation_type: &str,
2243        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2244        tx_builder: TransactionBuilder,
2245    ) -> anyhow::Result<OutPointRange> {
2246        Client::finalize_and_submit_transaction(
2247            self,
2248            operation_id,
2249            operation_type,
2250            // |out_point_range| operation_meta_gen(out_point_range),
2251            &operation_meta_gen,
2252            tx_builder,
2253        )
2254        .await
2255    }
2256
2257    async fn finalize_and_submit_transaction_inner(
2258        &self,
2259        dbtx: &mut DatabaseTransaction<'_>,
2260        operation_id: OperationId,
2261        tx_builder: TransactionBuilder,
2262    ) -> anyhow::Result<OutPointRange> {
2263        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2264    }
2265
2266    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2267        Client::transaction_updates(self, operation_id).await
2268    }
2269
2270    async fn await_primary_module_outputs(
2271        &self,
2272        operation_id: OperationId,
2273        // TODO: make `impl Iterator<Item = ...>`
2274        outputs: Vec<OutPoint>,
2275    ) -> anyhow::Result<()> {
2276        Client::await_primary_module_outputs(self, operation_id, outputs).await
2277    }
2278
2279    fn operation_log(&self) -> &OperationLog {
2280        Client::operation_log(self)
2281    }
2282
2283    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2284        Client::has_active_states(self, operation_id).await
2285    }
2286
2287    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2288        Client::operation_exists(self, operation_id).await
2289    }
2290
2291    async fn config(&self) -> ClientConfig {
2292        Client::config(self).await
2293    }
2294
2295    fn db(&self) -> &Database {
2296        Client::db(self)
2297    }
2298
2299    fn executor(&self) -> &Executor {
2300        Client::executor(self)
2301    }
2302
2303    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2304        Client::invite_code(self, peer).await
2305    }
2306
2307    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2308        Client::get_internal_payment_markers(self)
2309    }
2310
2311    async fn log_event_json(
2312        &self,
2313        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2314        module_kind: Option<ModuleKind>,
2315        module_id: ModuleInstanceId,
2316        kind: EventKind,
2317        payload: serde_json::Value,
2318        persist: bool,
2319    ) {
2320        dbtx.ensure_global()
2321            .expect("Must be called with global dbtx");
2322        self.log_event_raw_dbtx(
2323            dbtx,
2324            kind,
2325            module_kind.map(|kind| (kind, module_id)),
2326            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2327            persist,
2328        )
2329        .await;
2330    }
2331}
2332#[derive(Deserialize)]
2333struct GetInviteCodeRequest {
2334    peer: PeerId,
2335}
2336
2337/// See [`Client::transaction_updates`]
2338pub struct TransactionUpdates {
2339    update_stream: BoxStream<'static, TxSubmissionStatesSM>,
2340}
2341
2342impl TransactionUpdates {
2343    /// Waits for the transaction to be accepted or rejected as part of the
2344    /// operation to which the `TransactionUpdates` object is subscribed.
2345    pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
2346        debug!(target: LOG_CLIENT, %await_txid, "Await tx accepted");
2347        self.update_stream
2348            .filter_map(|tx_update| {
2349                std::future::ready(match tx_update.state {
2350                    TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
2351                    TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
2352                        Some(Err(submit_error))
2353                    }
2354                    _ => None,
2355                })
2356            })
2357            .next_or_pending()
2358            .await?;
2359        debug!(target: LOG_CLIENT, %await_txid, "Tx accepted");
2360        Ok(())
2361    }
2362}
2363
2364/// Admin (guardian) identification and authentication
2365pub struct AdminCreds {
2366    /// Guardian's own `peer_id`
2367    pub peer_id: PeerId,
2368    /// Authentication details
2369    pub auth: ApiAuth,
2370}
2371
2372/// Used to configure, assemble and build [`Client`]
2373pub struct ClientBuilder {
2374    module_inits: ClientModuleInitRegistry,
2375    primary_module_instance: Option<ModuleInstanceId>,
2376    primary_module_kind: Option<ModuleKind>,
2377    admin_creds: Option<AdminCreds>,
2378    db_no_decoders: Database,
2379    meta_service: Arc<MetaService>,
2380    connector: Connector,
2381    stopped: bool,
2382    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2383}
2384
2385impl ClientBuilder {
2386    fn new(db: Database) -> Self {
2387        let meta_service = MetaService::new(LegacyMetaSource::default());
2388        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
2389            broadcast::channel(1024);
2390        ClientBuilder {
2391            module_inits: ModuleInitRegistry::new(),
2392            primary_module_instance: None,
2393            primary_module_kind: None,
2394            connector: Connector::default(),
2395            admin_creds: None,
2396            db_no_decoders: db,
2397            stopped: false,
2398            meta_service,
2399            log_event_added_transient_tx,
2400        }
2401    }
2402
2403    fn from_existing(client: &Client) -> Self {
2404        ClientBuilder {
2405            module_inits: client.module_inits.clone(),
2406            primary_module_instance: Some(client.primary_module_instance),
2407            primary_module_kind: None,
2408            admin_creds: None,
2409            db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
2410            stopped: false,
2411            // non unique
2412            meta_service: client.meta_service.clone(),
2413            connector: client.connector,
2414            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
2415        }
2416    }
2417
2418    /// Replace module generator registry entirely
2419    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
2420        self.module_inits = module_inits;
2421    }
2422
2423    /// Make module generator available when reading the config
2424    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
2425        self.module_inits.attach(module_init);
2426    }
2427
2428    pub fn stopped(&mut self) {
2429        self.stopped = true;
2430    }
2431
2432    /// Uses this module with the given instance id as the primary module. See
2433    /// [`ClientModule::supports_being_primary`] for more information.
2434    ///
2435    /// ## Panics
2436    /// If there was a primary module specified previously
2437    #[deprecated(
2438        since = "0.6.0",
2439        note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
2440    )]
2441    pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
2442        self.with_primary_module_instance_id(primary_module_instance);
2443    }
2444
2445    /// **You are likely looking for
2446    /// [`ClientBuilder::with_primary_module_kind`]. This function is rarely
2447    /// useful and often dangerous, handle with care.**
2448    ///
2449    /// Uses this module with the given instance id as the primary module. See
2450    /// [`ClientModule::supports_being_primary`] for more information. Since the
2451    /// module instance id of modules of a specific kind may differ between
2452    /// different federations it is generally not recommended to specify it, but
2453    /// rather to specify the module kind that should be used as primary. See
2454    /// [`ClientBuilder::with_primary_module_kind`].
2455    ///
2456    /// ## Panics
2457    /// If there was a primary module specified previously
2458    pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
2459        let was_replaced = self
2460            .primary_module_instance
2461            .replace(primary_module_instance)
2462            .is_some();
2463        assert!(
2464            !was_replaced,
2465            "Only one primary module can be given to the builder."
2466        );
2467    }
2468
2469    /// Uses this module kind as the primary module if present in the config.
2470    /// See [`ClientModule::supports_being_primary`] for more information.
2471    ///
2472    /// ## Panics
2473    /// If there was a primary module kind specified previously
2474    pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
2475        let was_replaced = self
2476            .primary_module_kind
2477            .replace(primary_module_kind)
2478            .is_some();
2479        assert!(
2480            !was_replaced,
2481            "Only one primary module kind can be given to the builder."
2482        );
2483    }
2484
2485    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
2486        self.meta_service = meta_service;
2487    }
2488
2489    async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> {
2490        // Only apply the client database migrations if the database has been
2491        // initialized.
2492        // This only works as long as you don't change the client config
2493        if let Ok(client_config) = self.load_existing_config().await {
2494            for (module_id, module_cfg) in client_config.modules {
2495                let kind = module_cfg.kind.clone();
2496                let Some(init) = self.module_inits.get(&kind) else {
2497                    // normal, expected and already logged about when building the client
2498                    continue;
2499                };
2500
2501                apply_migrations_client(
2502                    db,
2503                    kind.to_string(),
2504                    init.get_database_migrations(),
2505                    module_id,
2506                )
2507                .await?;
2508            }
2509        }
2510
2511        Ok(())
2512    }
2513
2514    pub fn db_no_decoders(&self) -> &Database {
2515        &self.db_no_decoders
2516    }
2517
2518    pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
2519        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2520            bail!("Client database not initialized")
2521        };
2522
2523        Ok(config)
2524    }
2525
2526    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
2527        self.admin_creds = Some(creds);
2528    }
2529
2530    pub fn with_connector(&mut self, connector: Connector) {
2531        self.connector = connector;
2532    }
2533
2534    #[cfg(feature = "tor")]
2535    pub fn with_tor_connector(&mut self) {
2536        self.with_connector(Connector::tor());
2537    }
2538
2539    async fn init(
2540        self,
2541        pre_root_secret: DerivableSecret,
2542        config: ClientConfig,
2543        api_secret: Option<String>,
2544        init_mode: InitMode,
2545    ) -> anyhow::Result<ClientHandle> {
2546        if Client::is_initialized(&self.db_no_decoders).await {
2547            bail!("Client database already initialized")
2548        }
2549
2550        // Note: It's important all client initialization is performed as one big
2551        // transaction to avoid half-initialized client state.
2552        {
2553            debug!(target: LOG_CLIENT, "Initializing client database");
2554            let mut dbtx = self.db_no_decoders.begin_transaction().await;
2555            // Save config to DB
2556            dbtx.insert_new_entry(&ClientConfigKey, &config).await;
2557            dbtx.insert_entry(
2558                &ClientPreRootSecretHashKey,
2559                &pre_root_secret.derive_pre_root_secret_hash(),
2560            )
2561            .await;
2562
2563            if let Some(api_secret) = api_secret.as_ref() {
2564                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
2565            }
2566
2567            let init_state = InitState::Pending(init_mode);
2568            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
2569
2570            let metadata = init_state
2571                .does_require_recovery()
2572                .flatten()
2573                .map_or(Metadata::empty(), |s| s.metadata);
2574
2575            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
2576
2577            dbtx.commit_tx_result().await?;
2578        }
2579
2580        let stopped = self.stopped;
2581        self.build(pre_root_secret, config, api_secret, stopped)
2582            .await
2583    }
2584
2585    /// Join a new Federation
2586    ///
2587    /// When a user wants to connect to a new federation this function fetches
2588    /// the federation config and initializes the client database. If a user
2589    /// already joined the federation in the past and has a preexisting database
2590    /// use [`ClientBuilder::open`] instead.
2591    ///
2592    /// **Warning**: Calling `join` with a `root_secret` key that was used
2593    /// previous to `join` a Federation will lead to all sorts of malfunctions
2594    /// including likely loss of funds.
2595    ///
2596    /// This should be generally called only if the `root_secret` key is known
2597    /// not to have been used before (e.g. just randomly generated). For keys
2598    /// that might have been previous used (e.g. provided by the user),
2599    /// it's safer to call [`Self::recover`] which will attempt to recover
2600    /// client module states for the Federation.
2601    ///
2602    /// A typical "join federation" flow would look as follows:
2603    /// ```no_run
2604    /// # use std::str::FromStr;
2605    /// # use fedimint_core::invite_code::InviteCode;
2606    /// # use fedimint_core::config::ClientConfig;
2607    /// # use fedimint_derive_secret::DerivableSecret;
2608    /// # use fedimint_client::{Client, ClientBuilder};
2609    /// # use fedimint_core::db::Database;
2610    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
2611    /// #
2612    /// # #[tokio::main]
2613    /// # async fn main() {
2614    /// # let root_secret: DerivableSecret = unimplemented!();
2615    /// // Create a root secret, e.g. via fedimint-bip39, see also:
2616    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
2617    /// // let root_secret = …;
2618    ///
2619    /// // Get invite code from user
2620    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
2621    ///     .expect("Invalid invite code");
2622    /// let config = fedimint_api_client::api::net::Connector::default().download_from_invite_code(&invite_code).await
2623    ///     .expect("Error downloading config");
2624    ///
2625    /// // Tell the user the federation name, bitcoin network
2626    /// // (e.g. from wallet module config), and other details
2627    /// // that are typically contained in the federation's
2628    /// // meta fields.
2629    ///
2630    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
2631    /// //     .expect("Module not found")
2632    /// //     .network;
2633    ///
2634    /// println!(
2635    ///     "The federation name is: {}",
2636    ///     config.meta::<String>(META_FEDERATION_NAME_KEY)
2637    ///         .expect("Could not decode name field")
2638    ///         .expect("Name isn't set")
2639    /// );
2640    ///
2641    /// // Open the client's database, using the federation ID
2642    /// // as the DB name is a common pattern:
2643    ///
2644    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
2645    /// // let db = RocksDb::open(db_path).expect("error opening DB");
2646    /// # let db: Database = unimplemented!();
2647    ///
2648    /// let client = Client::builder(db).await.expect("Error building client")
2649    ///     // Mount the modules the client should support:
2650    ///     // .with_module(LightningClientInit)
2651    ///     // .with_module(MintClientInit)
2652    ///     // .with_module(WalletClientInit::default())
2653    ///     .join(root_secret, config, None)
2654    ///     .await
2655    ///     .expect("Error joining federation");
2656    /// # }
2657    /// ```
2658    pub async fn join(
2659        self,
2660        pre_root_secret: DerivableSecret,
2661        config: ClientConfig,
2662        api_secret: Option<String>,
2663    ) -> anyhow::Result<ClientHandle> {
2664        self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
2665            .await
2666    }
2667
2668    /// Download most recent valid backup found from the Federation
2669    pub async fn download_backup_from_federation(
2670        &self,
2671        root_secret: &DerivableSecret,
2672        config: &ClientConfig,
2673        api_secret: Option<String>,
2674    ) -> anyhow::Result<Option<ClientBackup>> {
2675        let connector = self.connector;
2676        let api = DynGlobalApi::from_endpoints(
2677            // TODO: change join logic to use FederationId v2
2678            config
2679                .global
2680                .api_endpoints
2681                .iter()
2682                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
2683            &api_secret,
2684            &connector,
2685        );
2686        Client::download_backup_from_federation_static(
2687            &api,
2688            &Self::federation_root_secret(root_secret, config),
2689            &self.decoders(config),
2690        )
2691        .await
2692    }
2693
2694    /// Join a (possibly) previous joined Federation
2695    ///
2696    /// Unlike [`Self::join`], `recover` will run client module recovery for
2697    /// each client module attempting to recover any previous module state.
2698    ///
2699    /// Recovery process takes time during which each recovering client module
2700    /// will not be available for use.
2701    ///
2702    /// Calling `recovery` with a `root_secret` that was not actually previous
2703    /// used in a given Federation is safe.
2704    pub async fn recover(
2705        self,
2706        root_secret: DerivableSecret,
2707        config: ClientConfig,
2708        api_secret: Option<String>,
2709        backup: Option<ClientBackup>,
2710    ) -> anyhow::Result<ClientHandle> {
2711        let client = self
2712            .init(
2713                root_secret,
2714                config,
2715                api_secret,
2716                InitMode::Recover {
2717                    snapshot: backup.clone(),
2718                },
2719            )
2720            .await?;
2721
2722        Ok(client)
2723    }
2724
2725    pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
2726        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2727            bail!("Client database not initialized")
2728        };
2729
2730        if let Some(secret_hash) = self
2731            .db_no_decoders()
2732            .begin_transaction_nc()
2733            .await
2734            .get_value(&ClientPreRootSecretHashKey)
2735            .await
2736        {
2737            ensure!(
2738                pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
2739                "Secret hash does not match. Incorrect secret"
2740            );
2741        } else {
2742            debug!(target: LOG_CLIENT, "Backfilling secret hash");
2743            // Note: no need for dbtx autocommit, we are the only writer ATM
2744            let mut dbtx = self.db_no_decoders.begin_transaction().await;
2745            dbtx.insert_entry(
2746                &ClientPreRootSecretHashKey,
2747                &pre_root_secret.derive_pre_root_secret_hash(),
2748            )
2749            .await;
2750            dbtx.commit_tx().await;
2751        }
2752
2753        let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
2754        let stopped = self.stopped;
2755
2756        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2757        let client = self
2758            .build_stopped(
2759                pre_root_secret,
2760                &config,
2761                api_secret,
2762                log_event_added_transient_tx,
2763            )
2764            .await?;
2765        if !stopped {
2766            client.as_inner().start_executor();
2767        }
2768        Ok(client)
2769    }
2770
2771    /// Build a [`Client`] and start the executor
2772    async fn build(
2773        self,
2774        pre_root_secret: DerivableSecret,
2775        config: ClientConfig,
2776        api_secret: Option<String>,
2777        stopped: bool,
2778    ) -> anyhow::Result<ClientHandle> {
2779        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2780        let client = self
2781            .build_stopped(
2782                pre_root_secret,
2783                &config,
2784                api_secret,
2785                log_event_added_transient_tx,
2786            )
2787            .await?;
2788        if !stopped {
2789            client.as_inner().start_executor();
2790        }
2791
2792        Ok(client)
2793    }
2794
2795    // TODO: remove config argument
2796    /// Build a [`Client`] but do not start the executor
2797    async fn build_stopped(
2798        self,
2799        root_secret: DerivableSecret,
2800        config: &ClientConfig,
2801        api_secret: Option<String>,
2802        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2803    ) -> anyhow::Result<ClientHandle> {
2804        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
2805        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
2806
2807        let decoders = self.decoders(config);
2808        let config = Self::config_decoded(config, &decoders)?;
2809        let fed_id = config.calculate_federation_id();
2810        let db = self.db_no_decoders.with_decoders(decoders.clone());
2811        let connector = self.connector;
2812        let peer_urls = get_api_urls(&db, &config).await;
2813        let api = if let Some(admin_creds) = self.admin_creds.as_ref() {
2814            ReconnectFederationApi::new_admin(
2815                admin_creds.peer_id,
2816                peer_urls
2817                    .into_iter()
2818                    .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
2819                    .context("Admin creds should match a peer")?,
2820                &api_secret,
2821                &connector,
2822            )
2823            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2824            .with_cache()
2825            .into()
2826        } else {
2827            ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, &connector, None)
2828                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2829                .with_cache()
2830                .into()
2831        };
2832        let task_group = TaskGroup::new();
2833
2834        // Migrate the database before interacting with it in case any on-disk data
2835        // structures have changed.
2836        self.migrate_database(&db).await?;
2837
2838        let init_state = Self::load_init_state(&db).await;
2839
2840        let primary_module_instance = self
2841            .primary_module_instance
2842            .or_else(|| {
2843                let primary_module_kind = self.primary_module_kind?;
2844                config
2845                    .modules
2846                    .iter()
2847                    .find_map(|(module_instance_id, module_config)| {
2848                        (module_config.kind() == &primary_module_kind)
2849                            .then_some(*module_instance_id)
2850                    })
2851            })
2852            .ok_or(anyhow!("No primary module set or found"))?;
2853
2854        let notifier = Notifier::new(db.clone());
2855
2856        let common_api_versions = Client::load_and_refresh_common_api_version_static(
2857            &config,
2858            &self.module_inits,
2859            &api,
2860            &db,
2861            &task_group,
2862        )
2863        .await
2864        .inspect_err(|err| {
2865            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
2866        })
2867        .unwrap_or(ApiVersionSet {
2868            core: ApiVersion::new(0, 0),
2869            // This will cause all modules to skip initialization
2870            modules: BTreeMap::new(),
2871        });
2872
2873        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
2874
2875        let mut module_recoveries: BTreeMap<
2876            ModuleInstanceId,
2877            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
2878        > = BTreeMap::new();
2879        let mut module_recovery_progress_receivers: BTreeMap<
2880            ModuleInstanceId,
2881            watch::Receiver<RecoveryProgress>,
2882        > = BTreeMap::new();
2883
2884        let final_client = FinalClientIface::default();
2885
2886        let root_secret = Self::federation_root_secret(&root_secret, &config);
2887
2888        let modules = {
2889            let mut modules = ClientModuleRegistry::default();
2890            for (module_instance_id, module_config) in config.modules.clone() {
2891                let kind = module_config.kind().clone();
2892                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
2893                    debug!(
2894                        target: LOG_CLIENT,
2895                        kind=%kind,
2896                        instance_id=%module_instance_id,
2897                        "Module kind of instance not found in module gens, skipping");
2898                    continue;
2899                };
2900
2901                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
2902                else {
2903                    warn!(
2904                        target: LOG_CLIENT,
2905                        kind=%kind,
2906                        instance_id=%module_instance_id,
2907                        "Module kind of instance has incompatible api version, skipping"
2908                    );
2909                    continue;
2910                };
2911
2912                // since the exact logic of when to start recovery is a bit gnarly,
2913                // the recovery call is extracted here.
2914                let start_module_recover_fn =
2915                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
2916                        let module_config = module_config.clone();
2917                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
2918                        let db = db.clone();
2919                        let kind = kind.clone();
2920                        let notifier = notifier.clone();
2921                        let api = api.clone();
2922                        let root_secret = root_secret.clone();
2923                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
2924                        let final_client = final_client.clone();
2925                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
2926                        let task_group = task_group.clone();
2927                        let module_init = module_init.clone();
2928                        (
2929                            Box::pin(async move {
2930                                module_init
2931                                    .recover(
2932                                        final_client.clone(),
2933                                        fed_id,
2934                                        num_peers,
2935                                        module_config.clone(),
2936                                        db.clone(),
2937                                        module_instance_id,
2938                                        common_api_versions.core,
2939                                        api_version,
2940                                        root_secret.derive_module_secret(module_instance_id),
2941                                        notifier.clone(),
2942                                        api.clone(),
2943                                        admin_auth,
2944                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
2945                                        progress_tx,
2946                                        task_group,
2947                                    )
2948                                    .await
2949                                    .inspect_err(|err| {
2950                                        warn!(
2951                                            target: LOG_CLIENT,
2952                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
2953                                        );
2954                                    })
2955                            }),
2956                            progress_rx,
2957                        )
2958                    };
2959
2960                let recovery = if let Some(snapshot) = init_state.does_require_recovery() {
2961                    if let Some(module_recovery_state) = db
2962                        .begin_transaction_nc()
2963                        .await
2964                        .get_value(&ClientModuleRecovery { module_instance_id })
2965                        .await
2966                    {
2967                        if module_recovery_state.is_done() {
2968                            debug!(
2969                                id = %module_instance_id,
2970                                %kind, "Module recovery already complete"
2971                            );
2972                            None
2973                        } else {
2974                            debug!(
2975                                id = %module_instance_id,
2976                                %kind,
2977                                progress = %module_recovery_state.progress,
2978                                "Starting module recovery with an existing progress"
2979                            );
2980                            Some(start_module_recover_fn(
2981                                snapshot,
2982                                module_recovery_state.progress,
2983                            ))
2984                        }
2985                    } else {
2986                        let progress = RecoveryProgress::none();
2987                        let mut dbtx = db.begin_transaction().await;
2988                        dbtx.log_event(
2989                            log_ordering_wakeup_tx.clone(),
2990                            None,
2991                            ModuleRecoveryStarted {
2992                                module_id: module_instance_id,
2993                            },
2994                        )
2995                        .await;
2996                        dbtx.insert_entry(
2997                            &ClientModuleRecovery { module_instance_id },
2998                            &ClientModuleRecoveryState { progress },
2999                        )
3000                        .await;
3001
3002                        dbtx.commit_tx().await;
3003
3004                        debug!(
3005                            id = %module_instance_id,
3006                            %kind, "Starting new module recovery"
3007                        );
3008                        Some(start_module_recover_fn(snapshot, progress))
3009                    }
3010                } else {
3011                    None
3012                };
3013
3014                if let Some((recovery, recovery_progress_rx)) = recovery {
3015                    module_recoveries.insert(module_instance_id, recovery);
3016                    module_recovery_progress_receivers
3017                        .insert(module_instance_id, recovery_progress_rx);
3018                } else {
3019                    let module = module_init
3020                        .init(
3021                            final_client.clone(),
3022                            fed_id,
3023                            config.global.api_endpoints.len(),
3024                            module_config,
3025                            db.clone(),
3026                            module_instance_id,
3027                            common_api_versions.core,
3028                            api_version,
3029                            // This is a divergence from the legacy client, where the child secret
3030                            // keys were derived using *module kind*-specific derivation paths.
3031                            // Since the new client has to support multiple, segregated modules of
3032                            // the same kind we have to use the instance id instead.
3033                            root_secret.derive_module_secret(module_instance_id),
3034                            notifier.clone(),
3035                            api.clone(),
3036                            self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
3037                            task_group.clone(),
3038                        )
3039                        .await?;
3040
3041                    if primary_module_instance == module_instance_id
3042                        && !module.supports_being_primary()
3043                    {
3044                        bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module");
3045                    }
3046
3047                    modules.register_module(module_instance_id, kind, module);
3048                }
3049            }
3050            modules
3051        };
3052
3053        if init_state.is_pending() && module_recoveries.is_empty() {
3054            let mut dbtx = db.begin_transaction().await;
3055            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
3056                .await;
3057            dbtx.commit_tx().await;
3058        }
3059
3060        let executor = {
3061            let mut executor_builder = Executor::builder();
3062            executor_builder
3063                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
3064
3065            for (module_instance_id, _, module) in modules.iter_modules() {
3066                executor_builder.with_module_dyn(module.context(module_instance_id));
3067            }
3068
3069            for module_instance_id in module_recoveries.keys() {
3070                executor_builder.with_valid_module_id(*module_instance_id);
3071            }
3072
3073            executor_builder.build(db.clone(), notifier, task_group.clone())
3074        };
3075
3076        let recovery_receiver_init_val = module_recovery_progress_receivers
3077            .iter()
3078            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
3079            .collect::<BTreeMap<_, _>>();
3080        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
3081            watch::channel(recovery_receiver_init_val);
3082
3083        let client_inner = Arc::new(Client {
3084            config: RwLock::new(config.clone()),
3085            api_secret,
3086            decoders,
3087            db: db.clone(),
3088            federation_id: fed_id,
3089            federation_config_meta: config.global.meta,
3090            primary_module_instance,
3091            modules,
3092            module_inits: self.module_inits.clone(),
3093            log_ordering_wakeup_tx,
3094            log_event_added_rx,
3095            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
3096            executor,
3097            api,
3098            secp_ctx: Secp256k1::new(),
3099            root_secret,
3100            task_group,
3101            operation_log: OperationLog::new(db.clone()),
3102            client_recovery_progress_receiver,
3103            meta_service: self.meta_service,
3104            connector,
3105        });
3106        client_inner
3107            .task_group
3108            .spawn_cancellable("MetaService::update_continuously", {
3109                let client_inner = client_inner.clone();
3110                async move {
3111                    client_inner
3112                        .meta_service
3113                        .update_continuously(&client_inner)
3114                        .await;
3115                }
3116            });
3117
3118        client_inner.task_group.spawn_cancellable(
3119            "update-api-announcements",
3120            run_api_announcement_sync(client_inner.clone()),
3121        );
3122
3123        client_inner.task_group.spawn_cancellable(
3124            "event log ordering task",
3125            run_event_log_ordering_task(
3126                db.clone(),
3127                log_ordering_wakeup_rx,
3128                log_event_added_tx,
3129                log_event_added_transient_tx,
3130            ),
3131        );
3132        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
3133
3134        let client_arc = ClientHandle::new(client_inner);
3135
3136        for (_, _, module) in client_arc.modules.iter_modules() {
3137            module.start().await;
3138        }
3139
3140        final_client.set(client_iface.clone());
3141
3142        if !module_recoveries.is_empty() {
3143            client_arc.spawn_module_recoveries_task(
3144                client_recovery_progress_sender,
3145                module_recoveries,
3146                module_recovery_progress_receivers,
3147            );
3148        }
3149
3150        Ok(client_arc)
3151    }
3152
3153    async fn load_init_state(db: &Database) -> InitState {
3154        let mut dbtx = db.begin_transaction_nc().await;
3155        dbtx.get_value(&ClientInitStateKey)
3156            .await
3157            .unwrap_or_else(|| {
3158                // could be turned in a hard error in the future, but for now
3159                // no need to break backward compat.
3160                warn!(
3161                    target: LOG_CLIENT,
3162                    "Client missing ClientRequiresRecovery: assuming complete"
3163                );
3164                db::InitState::Complete(db::InitModeComplete::Fresh)
3165            })
3166    }
3167
3168    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
3169        let mut decoders = client_decoders(
3170            &self.module_inits,
3171            config
3172                .modules
3173                .iter()
3174                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
3175        );
3176
3177        decoders.register_module(
3178            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
3179            ModuleKind::from_static_str("tx_submission"),
3180            tx_submission_sm_decoder(),
3181        );
3182
3183        decoders
3184    }
3185
3186    fn config_decoded(
3187        config: &ClientConfig,
3188        decoders: &ModuleDecoderRegistry,
3189    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
3190        config.clone().redecode_raw(decoders)
3191    }
3192
3193    /// Re-derive client's `root_secret` using the federation ID. This
3194    /// eliminates the possibility of having the same client `root_secret`
3195    /// across multiple federations.
3196    fn federation_root_secret(
3197        root_secret: &DerivableSecret,
3198        config: &ClientConfig,
3199    ) -> DerivableSecret {
3200        root_secret.federation_key(&config.global.calculate_federation_id())
3201    }
3202
3203    /// Register to receiver all new transient (unpersisted) events
3204    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
3205        self.log_event_added_transient_tx.subscribe()
3206    }
3207}
3208
3209/// Fetches the encoded client secret from the database and decodes it.
3210/// If an encoded client secret is not present in the database, or if
3211/// decoding fails, an error is returned.
3212pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
3213    let mut tx = db.begin_transaction_nc().await;
3214    let client_secret = tx.get_value(&EncodedClientSecretKey).await;
3215
3216    match client_secret {
3217        Some(client_secret) => {
3218            T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
3219                .map_err(|e| anyhow!("Decoding failed: {e}"))
3220        }
3221        None => bail!("Encoded client secret not present in DB"),
3222    }
3223}
3224
3225pub fn client_decoders<'a>(
3226    registry: &ModuleInitRegistry<DynClientModuleInit>,
3227    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
3228) -> ModuleDecoderRegistry {
3229    let mut modules = BTreeMap::new();
3230    for (id, kind) in module_kinds {
3231        let Some(init) = registry.get(kind) else {
3232            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
3233            continue;
3234        };
3235
3236        modules.insert(
3237            id,
3238            (
3239                kind.clone(),
3240                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
3241            ),
3242        );
3243    }
3244    ModuleDecoderRegistry::from(modules)
3245}