1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::explicit_deref_methods)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::return_self_not_must_use)]
10#![allow(clippy::too_many_lines)]
11#![allow(clippy::type_complexity)]
12
13use std::collections::{BTreeMap, HashSet};
83use std::fmt::{Debug, Formatter};
84use std::future::pending;
85use std::ops::{self, Range};
86use std::pin::Pin;
87use std::sync::{Arc, Weak};
88use std::time::Duration;
89
90use anyhow::{anyhow, bail, ensure, format_err, Context};
91use api::ClientRawFederationApiExt as _;
92use async_stream::{stream, try_stream};
93use backup::ClientBackup;
94use bitcoin::secp256k1;
95use db::event_log::{
96 self, run_event_log_ordering_task, DBTransactionEventLogExt, Event, EventKind, EventLogEntry,
97 EventLogId,
98};
99use db::{
100 apply_migrations_client, apply_migrations_core_client, get_core_client_database_migrations,
101 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientInitStateKey,
102 ClientModuleRecovery, ClientPreRootSecretHashKey, EncodedClientSecretKey, InitMode,
103 PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey,
104};
105use fedimint_api_client::api::net::Connector;
106use fedimint_api_client::api::{
107 ApiVersionSet, DynGlobalApi, DynModuleApi, FederationApiExt, GlobalFederationApiWithCacheExt,
108 IGlobalFederationApi, WsFederationApi,
109};
110use fedimint_core::config::{
111 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
112};
113use fedimint_core::core::{
114 DynInput, DynOutput, IInput, IOutput, IntoDynInstance as _, ModuleInstanceId, ModuleKind,
115 OperationId,
116};
117use fedimint_core::db::{
118 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
119 IDatabaseTransactionOpsCoreTyped, NonCommittable,
120};
121use fedimint_core::encoding::{Decodable, Encodable};
122use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
123use fedimint_core::invite_code::InviteCode;
124use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
125use fedimint_core::module::{
126 ApiAuth, ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
127 SupportedCoreApiVersions, SupportedModuleApiVersions,
128};
129use fedimint_core::net::api_announcement::SignedApiAnnouncement;
130use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
131use fedimint_core::transaction::Transaction;
132use fedimint_core::util::{backoff_util, retry, BoxStream, NextOrPending, SafeUrl};
133use fedimint_core::{
134 apply, async_trait_maybe_send, dyn_newtype_define, fedimint_build_code_version_env,
135 maybe_add_send, maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId,
136 TransactionId,
137};
138pub use fedimint_derive_secret as derivable_secret;
139use fedimint_derive_secret::DerivableSecret;
140use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
141use futures::stream::FuturesUnordered;
142use futures::{Future, Stream, StreamExt};
143use meta::{LegacyMetaSource, MetaService};
144use module::recovery::RecoveryProgress;
145use module::{DynClientModule, FinalClient};
146use rand::thread_rng;
147use secp256k1::{PublicKey, Secp256k1};
148use secret::{DeriveableSecretClientExt, PlainRootSecretStrategy, RootSecretStrategy as _};
149use serde::{Deserialize, Serialize};
150use thiserror::Error;
151#[cfg(not(target_family = "wasm"))]
152use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor};
153use tokio::sync::{broadcast, watch, RwLock};
154use tokio_stream::wrappers::WatchStream;
155use tracing::{debug, error, info, trace, warn};
156use transaction::{
157 ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputSM, TxSubmissionStatesSM,
158};
159
160use crate::api_announcements::{get_api_urls, run_api_announcement_sync, ApiAnnouncementPrefix};
161use crate::api_version_discovery::discover_common_api_versions_set;
162use crate::backup::Metadata;
163use crate::db::{ClientMetadataKey, ClientModuleRecoveryState, InitState, OperationLogKey};
164use crate::module::init::{
165 ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit,
166};
167use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator};
168use crate::oplog::OperationLog;
169use crate::sm::executor::{
170 ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix,
171};
172use crate::sm::{ClientSMDatabaseTransaction, DynState, Executor, IState, Notifier, State};
173use crate::transaction::{
174 tx_submission_sm_decoder, ClientInput, ClientOutputBundle, TransactionBuilder,
175 TxSubmissionContext, TxSubmissionStates, TRANSACTION_SUBMISSION_MODULE_INSTANCE,
176};
177
178pub mod api;
179
180pub mod backup;
182pub mod db;
184pub mod envs;
186pub mod module;
188pub mod oplog;
190pub mod secret;
192pub mod sm;
194pub mod transaction;
196
197mod api_version_discovery;
198
199pub mod api_announcements;
200pub mod meta;
202
203#[derive(Serialize, Deserialize)]
204pub struct TxCreatedEvent {
205 txid: TransactionId,
206 operation_id: OperationId,
207}
208
209impl Event for TxCreatedEvent {
210 const MODULE: Option<ModuleKind> = None;
211
212 const KIND: EventKind = EventKind::from_static("tx-created");
213}
214
215#[derive(Serialize, Deserialize)]
216pub struct TxAcceptedEvent {
217 txid: TransactionId,
218 operation_id: OperationId,
219}
220
221impl Event for TxAcceptedEvent {
222 const MODULE: Option<ModuleKind> = None;
223
224 const KIND: EventKind = EventKind::from_static("tx-accepted");
225}
226
227#[derive(Serialize, Deserialize)]
228pub struct TxRejectedEvent {
229 txid: TransactionId,
230 error: String,
231 operation_id: OperationId,
232}
233impl Event for TxRejectedEvent {
234 const MODULE: Option<ModuleKind> = None;
235
236 const KIND: EventKind = EventKind::from_static("tx-rejected");
237}
238
239#[derive(Serialize, Deserialize)]
240pub struct ModuleRecoveryStarted {
241 module_id: ModuleInstanceId,
242}
243
244impl Event for ModuleRecoveryStarted {
245 const MODULE: Option<ModuleKind> = None;
246
247 const KIND: EventKind = EventKind::from_static("module-recovery-started");
248}
249
250#[derive(Serialize, Deserialize)]
251pub struct ModuleRecoveryCompleted {
252 module_id: ModuleInstanceId,
253}
254
255impl Event for ModuleRecoveryCompleted {
256 const MODULE: Option<ModuleKind> = None;
257
258 const KIND: EventKind = EventKind::from_static("module-recovery-completed");
259}
260
261pub type InstancelessDynClientInput = ClientInput<Box<maybe_add_send_sync!(dyn IInput + 'static)>>;
262
263pub type InstancelessDynClientInputSM =
264 ClientInputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
265
266pub type InstancelessDynClientInputBundle = ClientInputBundle<
267 Box<maybe_add_send_sync!(dyn IInput + 'static)>,
268 Box<maybe_add_send_sync!(dyn IState + 'static)>,
269>;
270
271pub type InstancelessDynClientOutput =
272 ClientOutput<Box<maybe_add_send_sync!(dyn IOutput + 'static)>>;
273
274pub type InstancelessDynClientOutputSM =
275 ClientOutputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
276pub type InstancelessDynClientOutputBundle = ClientOutputBundle<
277 Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
278 Box<maybe_add_send_sync!(dyn IState + 'static)>,
279>;
280
281#[derive(Debug, Error)]
282pub enum AddStateMachinesError {
283 #[error("State already exists in database")]
284 StateAlreadyExists,
285 #[error("Got {0}")]
286 Other(#[from] anyhow::Error),
287}
288
289pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
290
291#[apply(async_trait_maybe_send!)]
292pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
293 fn module_api(&self) -> DynModuleApi;
296
297 async fn client_config(&self) -> ClientConfig;
298
299 fn api(&self) -> &DynGlobalApi;
305
306 fn decoders(&self) -> &ModuleDecoderRegistry;
307
308 async fn claim_inputs_dyn(
313 &self,
314 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
315 inputs: InstancelessDynClientInputBundle,
316 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>;
317
318 async fn fund_output_dyn(
323 &self,
324 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
325 outputs: InstancelessDynClientOutputBundle,
326 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>;
327
328 async fn add_state_machine_dyn(
330 &self,
331 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
332 sm: Box<maybe_add_send_sync!(dyn IState)>,
333 ) -> AddStateMachinesResult;
334
335 async fn log_event_json(
336 &self,
337 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
338 kind: EventKind,
339 module: Option<(ModuleKind, ModuleInstanceId)>,
340 payload: serde_json::Value,
341 transient: bool,
342 );
343
344 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM>;
345}
346
347#[apply(async_trait_maybe_send!)]
348impl IGlobalClientContext for () {
349 fn module_api(&self) -> DynModuleApi {
350 unimplemented!("fake implementation, only for tests");
351 }
352
353 async fn client_config(&self) -> ClientConfig {
354 unimplemented!("fake implementation, only for tests");
355 }
356
357 fn api(&self) -> &DynGlobalApi {
358 unimplemented!("fake implementation, only for tests");
359 }
360
361 fn decoders(&self) -> &ModuleDecoderRegistry {
362 unimplemented!("fake implementation, only for tests");
363 }
364
365 async fn claim_inputs_dyn(
366 &self,
367 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
368 _input: InstancelessDynClientInputBundle,
369 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
370 unimplemented!("fake implementation, only for tests");
371 }
372
373 async fn fund_output_dyn(
374 &self,
375 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
376 _outputs: InstancelessDynClientOutputBundle,
377 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
378 unimplemented!("fake implementation, only for tests");
379 }
380
381 async fn add_state_machine_dyn(
382 &self,
383 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
384 _sm: Box<maybe_add_send_sync!(dyn IState)>,
385 ) -> AddStateMachinesResult {
386 unimplemented!("fake implementation, only for tests");
387 }
388
389 async fn log_event_json(
390 &self,
391 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
392 _kind: EventKind,
393 _module: Option<(ModuleKind, ModuleInstanceId)>,
394 _payload: serde_json::Value,
395 _transient: bool,
396 ) {
397 unimplemented!("fake implementation, only for tests");
398 }
399
400 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
401 unimplemented!("fake implementation, only for tests");
402 }
403}
404
405dyn_newtype_define! {
406 #[derive(Clone)]
409 pub DynGlobalClientContext(Arc<IGlobalClientContext>)
410}
411
412impl DynGlobalClientContext {
413 pub fn new_fake() -> Self {
414 DynGlobalClientContext::from(())
415 }
416
417 pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> {
418 self.transaction_update_stream()
419 .await
420 .filter_map(|tx_update| {
421 std::future::ready(match tx_update.state {
422 TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
423 TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
424 Some(Err(submit_error))
425 }
426 _ => None,
427 })
428 })
429 .next_or_pending()
430 .await
431 }
432
433 pub async fn claim_inputs<I, S>(
434 &self,
435 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
436 inputs: ClientInputBundle<I, S>,
437 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
438 where
439 I: IInput + MaybeSend + MaybeSync + 'static,
440 S: IState + MaybeSend + MaybeSync + 'static,
441 {
442 self.claim_inputs_dyn(dbtx, inputs.into_instanceless())
443 .await
444 }
445
446 pub async fn fund_output<O, S>(
455 &self,
456 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
457 outputs: ClientOutputBundle<O, S>,
458 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
459 where
460 O: IOutput + MaybeSend + MaybeSync + 'static,
461 S: IState + MaybeSend + MaybeSync + 'static,
462 {
463 self.fund_output_dyn(dbtx, outputs.into_instanceless())
464 .await
465 }
466
467 pub async fn add_state_machine<S>(
471 &self,
472 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
473 sm: S,
474 ) -> AddStateMachinesResult
475 where
476 S: State + MaybeSend + MaybeSync + 'static,
477 {
478 self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
479 }
480
481 async fn log_event<E>(&self, dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, event: E)
482 where
483 E: Event + Send,
484 {
485 self.log_event_json(
486 dbtx,
487 E::KIND,
488 E::MODULE.map(|m| (m, dbtx.module_id())),
489 serde_json::to_value(&event).expect("Payload serialization can't fail"),
490 <E as Event>::PERSIST,
491 )
492 .await;
493 }
494}
495
496fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>(
497 state_gen: StateGenerator<S>,
498) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> {
499 Arc::new(move |out_point_range| {
500 let states: Vec<S> = state_gen(out_point_range);
501 states
502 .into_iter()
503 .map(|state| box_up_state(state))
504 .collect()
505 })
506}
507
508fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> {
511 Box::new(state)
512}
513
514impl<T> From<Arc<T>> for DynGlobalClientContext
515where
516 T: IGlobalClientContext,
517{
518 fn from(inner: Arc<T>) -> Self {
519 DynGlobalClientContext { inner }
520 }
521}
522
523impl Debug for Client {
525 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
526 write!(f, "Client")
527 }
528}
529
530#[derive(Clone, Debug)]
534struct ModuleGlobalClientContext {
535 client: Arc<Client>,
536 module_instance_id: ModuleInstanceId,
537 operation: OperationId,
538}
539
540#[apply(async_trait_maybe_send!)]
541impl IGlobalClientContext for ModuleGlobalClientContext {
542 fn module_api(&self) -> DynModuleApi {
543 self.api().with_module(self.module_instance_id)
544 }
545
546 fn api(&self) -> &DynGlobalApi {
547 &self.client.api
548 }
549
550 fn decoders(&self) -> &ModuleDecoderRegistry {
551 self.client.decoders()
552 }
553
554 async fn client_config(&self) -> ClientConfig {
555 self.client.config().await
556 }
557
558 async fn claim_inputs_dyn(
559 &self,
560 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
561 inputs: InstancelessDynClientInputBundle,
562 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
563 let tx_builder =
564 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
565
566 self.client
567 .finalize_and_submit_transaction_inner(
568 &mut dbtx.global_tx().to_ref_nc(),
569 self.operation,
570 tx_builder,
571 )
572 .await
573 }
574
575 async fn fund_output_dyn(
576 &self,
577 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
578 outputs: InstancelessDynClientOutputBundle,
579 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
580 let tx_builder =
581 TransactionBuilder::new().with_outputs(outputs.into_dyn(self.module_instance_id));
582
583 self.client
584 .finalize_and_submit_transaction_inner(
585 &mut dbtx.global_tx().to_ref_nc(),
586 self.operation,
587 tx_builder,
588 )
589 .await
590 }
591
592 async fn add_state_machine_dyn(
593 &self,
594 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
595 sm: Box<maybe_add_send_sync!(dyn IState)>,
596 ) -> AddStateMachinesResult {
597 let state = DynState::from_parts(self.module_instance_id, sm);
598
599 self.client
600 .executor
601 .add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state])
602 .await
603 }
604
605 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
606 self.client.transaction_update_stream(self.operation).await
607 }
608
609 async fn log_event_json(
610 &self,
611 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
612 kind: EventKind,
613 module: Option<(ModuleKind, ModuleInstanceId)>,
614 payload: serde_json::Value,
615 transient: bool,
616 ) {
617 self.client
618 .log_event_raw_dbtx(
619 dbtx.global_tx(),
620 kind,
621 module,
622 serde_json::to_vec(&payload).expect("Serialization can't fail"),
623 transient,
624 )
625 .await;
626 }
627}
628
629fn states_add_instance(
630 module_instance_id: ModuleInstanceId,
631 state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>,
632) -> StateGenerator<DynState> {
633 Arc::new(move |out_point_range| {
634 let states = state_gen(out_point_range);
635 Iterator::collect(
636 states
637 .into_iter()
638 .map(|state| DynState::from_parts(module_instance_id, state)),
639 )
640 })
641}
642
643#[derive(Debug)]
653pub struct ClientHandle {
654 inner: Option<Arc<Client>>,
655}
656
657pub type ClientHandleArc = Arc<ClientHandle>;
659
660impl ClientHandle {
661 fn new(inner: Arc<Client>) -> Self {
663 ClientHandle {
664 inner: inner.into(),
665 }
666 }
667
668 fn as_inner(&self) -> &Arc<Client> {
669 self.inner.as_ref().expect("Inner always set")
670 }
671
672 pub fn start_executor(&self) {
673 self.as_inner().start_executor();
674 }
675
676 pub async fn shutdown(mut self) {
678 self.shutdown_inner().await;
679 }
680
681 async fn shutdown_inner(&mut self) {
682 let Some(inner) = self.inner.take() else {
683 error!("ClientHandleShared::shutdown called twice");
684 return;
685 };
686 inner.executor.stop_executor();
687 let db = inner.db.clone();
688 debug!(target: LOG_CLIENT, "Waiting for client task group to shut down");
689 if let Err(err) = inner
690 .task_group
691 .clone()
692 .shutdown_join_all(Some(Duration::from_secs(30)))
693 .await
694 {
695 warn!(target: LOG_CLIENT, %err, "Error waiting for client task group to shut down");
696 }
697
698 let client_strong_count = Arc::strong_count(&inner);
699 debug!(target: LOG_CLIENT, "Dropping last handle to Client");
700 drop(inner);
703
704 if client_strong_count != 1 {
705 debug!(target: LOG_CLIENT, count = client_strong_count - 1, LOG_CLIENT, "External Client references remaining after last handle dropped");
706 }
707
708 let db_strong_count = db.strong_count();
709 if db_strong_count != 1 {
710 debug!(target: LOG_CLIENT, count = db_strong_count - 1, "External DB references remaining after last handle dropped");
711 }
712 trace!(target: LOG_CLIENT, "Dropped last handle to Client");
713 }
714
715 pub async fn restart(self) -> anyhow::Result<ClientHandle> {
723 let (builder, config, api_secret, root_secret) = {
724 let client = self
725 .inner
726 .as_ref()
727 .ok_or_else(|| format_err!("Already stopped"))?;
728 let builder = ClientBuilder::from_existing(client);
729 let config = client.config().await;
730 let api_secret = client.api_secret.clone();
731 let root_secret = client.root_secret.clone();
732
733 (builder, config, api_secret, root_secret)
734 };
735 self.shutdown().await;
736
737 builder.build(root_secret, config, api_secret, false).await
738 }
739}
740
741impl ops::Deref for ClientHandle {
742 type Target = Client;
743
744 fn deref(&self) -> &Self::Target {
745 self.inner.as_ref().expect("Must have inner client set")
746 }
747}
748
749impl ClientHandle {
750 pub(crate) fn downgrade(&self) -> ClientWeak {
751 ClientWeak {
752 inner: Arc::downgrade(self.inner.as_ref().expect("Inner always set")),
753 }
754 }
755}
756
757#[derive(Debug, Clone)]
759pub(crate) struct ClientStrong {
760 inner: Arc<Client>,
761}
762
763impl ops::Deref for ClientStrong {
764 type Target = Client;
765
766 fn deref(&self) -> &Self::Target {
767 self.inner.deref()
768 }
769}
770
771#[derive(Debug, Clone)]
775pub(crate) struct ClientWeak {
776 inner: Weak<Client>,
777}
778
779impl ClientWeak {
780 pub fn upgrade(&self) -> Option<ClientStrong> {
781 Weak::upgrade(&self.inner).map(|inner| ClientStrong { inner })
782 }
783}
784
785impl Drop for ClientHandle {
791 fn drop(&mut self) {
792 if self.inner.is_none() {
793 return;
794 }
795
796 #[cfg(target_family = "wasm")]
798 let can_block = false;
799 #[cfg(not(target_family = "wasm"))]
800 let can_block = RuntimeHandle::current().runtime_flavor() != RuntimeFlavor::CurrentThread;
802 if !can_block {
803 let inner = self.inner.take().expect("Must have inner client set");
804 inner.executor.stop_executor();
805 if cfg!(target_family = "wasm") {
806 error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on wasm, call ClientHandle::shutdown manually.");
807 } else {
808 error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on current thread runtime, call ClientHandle::shutdown manually.");
809 }
810 return;
811 }
812
813 debug!(target: LOG_CLIENT, "Shutting down the Client on last handle drop");
814 #[cfg(not(target_family = "wasm"))]
815 runtime::block_in_place(|| {
816 runtime::block_on(self.shutdown_inner());
817 });
818 }
819}
820
821const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
825 &[ApiVersion { major: 0, minor: 0 }];
826
827pub type ModuleGlobalContextGen = ContextGen;
828
829pub struct ClientModuleInstance<'m, M: ClientModule> {
831 pub id: ModuleInstanceId,
833 pub db: Database,
835 pub api: DynModuleApi,
837
838 module: &'m M,
839}
840
841impl<'m, M: ClientModule> ClientModuleInstance<'m, M> {
842 pub fn inner(&self) -> &'m M {
844 self.module
845 }
846}
847
848impl<'m, M> ops::Deref for ClientModuleInstance<'m, M>
849where
850 M: ClientModule,
851{
852 type Target = M;
853
854 fn deref(&self) -> &Self::Target {
855 self.module
856 }
857}
858
859pub struct Client {
873 config: RwLock<ClientConfig>,
874 api_secret: Option<String>,
875 decoders: ModuleDecoderRegistry,
876 db: Database,
877 federation_id: FederationId,
878 federation_meta: BTreeMap<String, String>,
879 primary_module_instance: ModuleInstanceId,
880 modules: ClientModuleRegistry,
881 module_inits: ClientModuleInitRegistry,
882 executor: Executor,
883 api: DynGlobalApi,
884 root_secret: DerivableSecret,
885 operation_log: OperationLog,
886 secp_ctx: Secp256k1<secp256k1::All>,
887 meta_service: Arc<MetaService>,
888 connector: Connector,
889
890 task_group: TaskGroup,
891
892 client_recovery_progress_receiver:
894 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
895
896 log_ordering_wakeup_tx: watch::Sender<()>,
899 log_event_added_rx: watch::Receiver<()>,
901 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
902}
903
904impl Client {
905 pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
908 apply_migrations_core_client(
909 &db,
910 "fedimint-client".to_string(),
911 get_core_client_database_migrations(),
912 )
913 .await?;
914 Ok(ClientBuilder::new(db))
915 }
916
917 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
918 self.api.as_ref()
919 }
920
921 pub fn api_clone(&self) -> DynGlobalApi {
922 self.api.clone()
923 }
924
925 pub fn task_group(&self) -> &TaskGroup {
927 &self.task_group
928 }
929
930 #[doc(hidden)]
932 pub fn executor(&self) -> &Executor {
933 &self.executor
934 }
935
936 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
937 let mut dbtx = db.begin_transaction_nc().await;
938 dbtx.get_value(&ClientConfigKey).await
939 }
940
941 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
942 let mut dbtx = db.begin_transaction_nc().await;
943 dbtx.get_value(&ApiSecretKey).await
944 }
945
946 pub async fn store_encodable_client_secret<T: Encodable>(
947 db: &Database,
948 secret: T,
949 ) -> anyhow::Result<()> {
950 let mut dbtx = db.begin_transaction().await;
951
952 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
954 bail!("Encoded client secret already exists, cannot overwrite")
955 }
956
957 let encoded_secret = T::consensus_encode_to_vec(&secret);
958 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
959 .await;
960 dbtx.commit_tx().await;
961 Ok(())
962 }
963
964 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
965 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
966 bail!("Encoded client secret not present in DB")
967 };
968
969 Ok(secret)
970 }
971 pub async fn load_decodable_client_secret_opt<T: Decodable>(
972 db: &Database,
973 ) -> anyhow::Result<Option<T>> {
974 let mut dbtx = db.begin_transaction_nc().await;
975
976 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
977
978 Ok(match client_secret {
979 Some(client_secret) => Some(
980 T::consensus_decode(&mut client_secret.as_slice(), &ModuleRegistry::default())
981 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
982 ),
983 None => None,
984 })
985 }
986
987 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
988 let client_secret =
989 if let Ok(secret) = Self::load_decodable_client_secret::<[u8; 64]>(db).await {
990 secret
991 } else {
992 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
993 Self::store_encodable_client_secret(db, secret)
994 .await
995 .expect("Storing client secret must work");
996 secret
997 };
998 Ok(client_secret)
999 }
1000
1001 pub async fn is_initialized(db: &Database) -> bool {
1002 Self::get_config_from_db(db).await.is_some()
1003 }
1004
1005 pub fn start_executor(self: &Arc<Self>) {
1006 debug!(
1007 "Starting fedimint client executor (version: {})",
1008 fedimint_build_code_version_env!()
1009 );
1010 self.executor.start_executor(self.context_gen());
1011 }
1012
1013 pub fn federation_id(&self) -> FederationId {
1014 self.federation_id
1015 }
1016
1017 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
1018 let client_inner = Arc::downgrade(self);
1019 Arc::new(move |module_instance, operation| {
1020 ModuleGlobalClientContext {
1021 client: client_inner
1022 .clone()
1023 .upgrade()
1024 .expect("ModuleGlobalContextGen called after client was dropped"),
1025 module_instance_id: module_instance,
1026 operation,
1027 }
1028 .into()
1029 })
1030 }
1031
1032 pub async fn config(&self) -> ClientConfig {
1033 self.config.read().await.clone()
1034 }
1035
1036 pub fn api_secret(&self) -> &Option<String> {
1037 &self.api_secret
1038 }
1039
1040 pub fn decoders(&self) -> &ModuleDecoderRegistry {
1041 &self.decoders
1042 }
1043
1044 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1046 self.try_get_module(instance)
1047 .expect("Module instance not found")
1048 }
1049
1050 fn try_get_module(
1051 &self,
1052 instance: ModuleInstanceId,
1053 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
1054 Some(self.modules.get(instance)?.as_ref())
1055 }
1056
1057 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
1058 self.modules.get(instance).is_some()
1059 }
1060
1061 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
1067 let mut in_amount = Amount::ZERO;
1069 let mut out_amount = Amount::ZERO;
1070 let mut fee_amount = Amount::ZERO;
1071
1072 for input in builder.inputs() {
1073 let module = self.get_module(input.input.module_instance_id());
1074
1075 let item_fee = module.input_fee(input.amount, &input.input).expect(
1076 "We only build transactions with input versions that are supported by the module",
1077 );
1078
1079 in_amount += input.amount;
1080 fee_amount += item_fee;
1081 }
1082
1083 for output in builder.outputs() {
1084 let module = self.get_module(output.output.module_instance_id());
1085
1086 let item_fee = module.output_fee(output.amount, &output.output).expect(
1087 "We only build transactions with output versions that are supported by the module",
1088 );
1089
1090 out_amount += output.amount;
1091 fee_amount += item_fee;
1092 }
1093
1094 (in_amount, out_amount + fee_amount)
1095 }
1096
1097 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1098 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
1099 }
1100
1101 pub fn get_meta(&self, key: &str) -> Option<String> {
1102 self.federation_meta.get(key).cloned()
1103 }
1104
1105 fn root_secret(&self) -> DerivableSecret {
1106 self.root_secret.clone()
1107 }
1108
1109 pub async fn add_state_machines(
1110 &self,
1111 dbtx: &mut DatabaseTransaction<'_>,
1112 states: Vec<DynState>,
1113 ) -> AddStateMachinesResult {
1114 self.executor.add_state_machines_dbtx(dbtx, states).await
1115 }
1116
1117 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
1119 let active_states = self.executor.get_active_states().await;
1120 let mut active_operations = HashSet::with_capacity(active_states.len());
1121 let mut dbtx = self.db().begin_transaction_nc().await;
1122 for (state, _) in active_states {
1123 let operation_id = state.operation_id();
1124 if dbtx
1125 .get_value(&OperationLogKey { operation_id })
1126 .await
1127 .is_some()
1128 {
1129 active_operations.insert(operation_id);
1130 }
1131 }
1132 active_operations
1133 }
1134
1135 pub fn operation_log(&self) -> &OperationLog {
1136 &self.operation_log
1137 }
1138
1139 pub fn meta_service(&self) -> &Arc<MetaService> {
1141 &self.meta_service
1142 }
1143
1144 async fn finalize_transaction(
1146 &self,
1147 dbtx: &mut DatabaseTransaction<'_>,
1148 operation_id: OperationId,
1149 mut partial_transaction: TransactionBuilder,
1150 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
1151 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1152
1153 let (added_input_bundle, change_outputs) = self
1154 .primary_module()
1155 .create_final_inputs_and_outputs(
1156 self.primary_module_instance,
1157 dbtx,
1158 operation_id,
1159 input_amount,
1160 output_amount,
1161 )
1162 .await?;
1163
1164 let change_range = Range {
1168 start: partial_transaction.outputs().count() as u64,
1169 end: (partial_transaction.outputs().count() + change_outputs.outputs.len()) as u64,
1170 };
1171
1172 partial_transaction = partial_transaction
1173 .with_inputs(added_input_bundle)
1174 .with_outputs(change_outputs);
1175
1176 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1177
1178 assert!(input_amount >= output_amount, "Transaction is underfunded");
1179
1180 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
1181
1182 Ok((tx, states, change_range))
1183 }
1184
1185 pub async fn finalize_and_submit_transaction<F, M>(
1197 &self,
1198 operation_id: OperationId,
1199 operation_type: &str,
1200 operation_meta: F,
1201 tx_builder: TransactionBuilder,
1202 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
1203 where
1204 F: Fn(TransactionId, Vec<OutPoint>) -> M + Clone + MaybeSend + MaybeSync,
1205 M: serde::Serialize + MaybeSend,
1206 {
1207 let operation_type = operation_type.to_owned();
1208
1209 let autocommit_res = self
1210 .db
1211 .autocommit(
1212 |dbtx, _| {
1213 let operation_type = operation_type.clone();
1214 let tx_builder = tx_builder.clone();
1215 let operation_meta = operation_meta.clone();
1216 Box::pin(async move {
1217 if Client::operation_exists_dbtx(dbtx, operation_id).await {
1218 bail!("There already exists an operation with id {operation_id:?}")
1219 }
1220
1221 let (txid, change) = self
1222 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
1223 .await?;
1224
1225 self.operation_log()
1226 .add_operation_log_entry(
1227 dbtx,
1228 operation_id,
1229 &operation_type,
1230 operation_meta(txid, change.clone()),
1231 )
1232 .await;
1233
1234 Ok((txid, change))
1235 })
1236 },
1237 Some(100), )
1239 .await;
1240
1241 match autocommit_res {
1242 Ok(txid) => Ok(txid),
1243 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
1244 Err(AutocommitError::CommitFailed {
1245 attempts,
1246 last_error,
1247 }) => panic!(
1248 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
1249 ),
1250 }
1251 }
1252
1253 async fn finalize_and_submit_transaction_inner(
1254 &self,
1255 dbtx: &mut DatabaseTransaction<'_>,
1256 operation_id: OperationId,
1257 tx_builder: TransactionBuilder,
1258 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
1259 let (transaction, mut states, change_range) = self
1260 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
1261 .await?;
1262
1263 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
1264 let inputs = transaction
1265 .inputs
1266 .iter()
1267 .map(DynInput::module_instance_id)
1268 .collect::<Vec<_>>();
1269 let outputs = transaction
1270 .outputs
1271 .iter()
1272 .map(DynOutput::module_instance_id)
1273 .collect::<Vec<_>>();
1274 warn!(
1275 target: LOG_CLIENT_NET_API,
1276 size=%transaction.consensus_encode_to_vec().len(),
1277 ?inputs,
1278 ?outputs,
1279 "Transaction too large",
1280 );
1281 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
1282 bail!(
1283 "The generated transaction would be rejected by the federation for being too large."
1284 );
1285 }
1286
1287 let txid = transaction.tx_hash();
1288
1289 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
1290
1291 let change_outpoints = change_range
1292 .into_iter()
1293 .map(|out_idx| OutPoint { txid, out_idx })
1294 .collect();
1295
1296 let tx_submission_sm = DynState::from_typed(
1297 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1298 TxSubmissionStatesSM {
1299 operation_id,
1300 state: TxSubmissionStates::Created(transaction),
1301 },
1302 );
1303 states.push(tx_submission_sm);
1304
1305 self.executor.add_state_machines_dbtx(dbtx, states).await?;
1306
1307 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
1308 .await;
1309
1310 Ok((txid, change_outpoints))
1311 }
1312
1313 async fn transaction_update_stream(
1314 &self,
1315 operation_id: OperationId,
1316 ) -> BoxStream<'static, TxSubmissionStatesSM> {
1317 self.executor
1318 .notifier()
1319 .module_notifier::<TxSubmissionStatesSM>(TRANSACTION_SUBMISSION_MODULE_INSTANCE)
1320 .subscribe(operation_id)
1321 .await
1322 }
1323
1324 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
1325 let mut dbtx = self.db().begin_transaction_nc().await;
1326
1327 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
1328 }
1329
1330 pub async fn operation_exists_dbtx(
1331 dbtx: &mut DatabaseTransaction<'_>,
1332 operation_id: OperationId,
1333 ) -> bool {
1334 let active_state_exists = dbtx
1335 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1336 .await
1337 .next()
1338 .await
1339 .is_some();
1340
1341 let inactive_state_exists = dbtx
1342 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
1343 .await
1344 .next()
1345 .await
1346 .is_some();
1347
1348 active_state_exists || inactive_state_exists
1349 }
1350
1351 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
1352 self.db
1353 .begin_transaction_nc()
1354 .await
1355 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1356 .await
1357 .next()
1358 .await
1359 .is_some()
1360 }
1361
1362 pub async fn await_primary_module_output(
1365 &self,
1366 operation_id: OperationId,
1367 out_point: OutPoint,
1368 ) -> anyhow::Result<()> {
1369 self.primary_module()
1370 .await_primary_module_output(operation_id, out_point)
1371 .await
1372 }
1373
1374 pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
1376 let module_kind = M::kind();
1377 let id = self
1378 .get_first_instance(&module_kind)
1379 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
1380 let module: &M = self
1381 .try_get_module(id)
1382 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
1383 .as_any()
1384 .downcast_ref::<M>()
1385 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
1386 let (db, _) = self.db().with_prefix_module_id(id);
1387 Ok(ClientModuleInstance {
1388 id,
1389 db,
1390 api: self.api().with_module(id),
1391 module,
1392 })
1393 }
1394
1395 pub fn get_module_client_dyn(
1396 &self,
1397 instance_id: ModuleInstanceId,
1398 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
1399 self.try_get_module(instance_id)
1400 .ok_or(anyhow!("Unknown module instance {}", instance_id))
1401 }
1402
1403 pub fn db(&self) -> &Database {
1404 &self.db
1405 }
1406
1407 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1410 TransactionUpdates {
1411 update_stream: self.transaction_update_stream(operation_id).await,
1412 }
1413 }
1414
1415 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1419 if self
1420 .modules
1421 .get_with_kind(self.primary_module_instance)
1422 .is_some_and(|(kind, _)| kind == module_kind)
1423 {
1424 return Some(self.primary_module_instance);
1425 }
1426
1427 self.modules
1428 .iter_modules()
1429 .find(|(_, kind, _module)| *kind == module_kind)
1430 .map(|(instance_id, _, _)| instance_id)
1431 }
1432
1433 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1436 get_decoded_client_secret::<T>(self.db()).await
1437 }
1438
1439 pub async fn await_primary_module_outputs(
1442 &self,
1443 operation_id: OperationId,
1444 outputs: Vec<OutPoint>,
1445 ) -> anyhow::Result<()> {
1446 for out_point in outputs {
1447 self.await_primary_module_output(operation_id, out_point)
1448 .await?;
1449 }
1450
1451 Ok(())
1452 }
1453
1454 pub async fn get_config_json(&self) -> JsonClientConfig {
1460 self.config().await.to_json()
1461 }
1462
1463 pub fn primary_module(&self) -> &DynClientModule {
1465 self.modules
1466 .get(self.primary_module_instance)
1467 .expect("primary module must be present")
1468 }
1469
1470 pub async fn get_balance(&self) -> Amount {
1472 self.primary_module()
1473 .get_balance(
1474 self.primary_module_instance,
1475 &mut self.db().begin_transaction_nc().await,
1476 )
1477 .await
1478 }
1479
1480 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
1483 let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
1484 let initial_balance = self.get_balance().await;
1485 let db = self.db().clone();
1486 let primary_module = self.primary_module().clone();
1487 let primary_module_instance = self.primary_module_instance;
1488
1489 Box::pin(stream! {
1490 yield initial_balance;
1491 let mut prev_balance = initial_balance;
1492 while let Some(()) = balance_changes.next().await {
1493 let mut dbtx = db.begin_transaction_nc().await;
1494 let balance = primary_module
1495 .get_balance(primary_module_instance, &mut dbtx)
1496 .await;
1497
1498 if balance != prev_balance {
1500 prev_balance = balance;
1501 yield balance;
1502 }
1503 }
1504 })
1505 }
1506
1507 pub async fn refresh_peers_api_versions(
1510 num_peers: NumPeers,
1511 api: DynGlobalApi,
1512 db: Database,
1513 num_responses_sender: watch::Sender<usize>,
1514 ) {
1515 async fn make_request(
1520 delay: Duration,
1521 peer_id: PeerId,
1522 api: &DynGlobalApi,
1523 ) -> (
1524 PeerId,
1525 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
1526 ) {
1527 runtime::sleep(delay).await;
1528 (
1529 peer_id,
1530 api.request_single_peer_typed::<SupportedApiVersionsSummary>(
1531 None,
1532 VERSION_ENDPOINT.to_owned(),
1533 ApiRequestErased::default(),
1534 peer_id,
1535 )
1536 .await,
1537 )
1538 }
1539
1540 let mut requests = FuturesUnordered::new();
1543
1544 for peer_id in num_peers.peer_ids() {
1545 requests.push(make_request(Duration::ZERO, peer_id, &api));
1546 }
1547
1548 let mut num_responses = 0;
1549
1550 while let Some((peer_id, response)) = requests.next().await {
1551 match response {
1552 Err(err) => {
1553 if db
1554 .begin_transaction_nc()
1555 .await
1556 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1557 .await
1558 .is_some()
1559 {
1560 debug!(target: LOG_CLIENT, %peer_id, %err, "Failed to refresh API versions of a peer, but we have a previous response");
1561 } else {
1562 debug!(target: LOG_CLIENT, %peer_id, %err, "Failed to refresh API versions of a peer, will retry");
1563 requests.push(make_request(Duration::from_secs(15), peer_id, &api));
1564 }
1565 }
1566 Ok(o) => {
1567 let mut dbtx = db.begin_transaction().await;
1570 dbtx.insert_entry(
1571 &PeerLastApiVersionsSummaryKey(peer_id),
1572 &PeerLastApiVersionsSummary(o),
1573 )
1574 .await;
1575 dbtx.commit_tx().await;
1576 num_responses += 1;
1577 let _ = num_responses_sender.send(num_responses);
1579 }
1580 }
1581 }
1582 }
1583
1584 pub fn supported_api_versions_summary_static(
1586 config: &ClientConfig,
1587 client_module_init: &ClientModuleInitRegistry,
1588 ) -> SupportedApiVersionsSummary {
1589 SupportedApiVersionsSummary {
1590 core: SupportedCoreApiVersions {
1591 core_consensus: config.global.consensus_version,
1592 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1593 .expect("must not have conflicting versions"),
1594 },
1595 modules: config
1596 .modules
1597 .iter()
1598 .filter_map(|(&module_instance_id, module_config)| {
1599 client_module_init
1600 .get(module_config.kind())
1601 .map(|module_init| {
1602 (
1603 module_instance_id,
1604 SupportedModuleApiVersions {
1605 core_consensus: config.global.consensus_version,
1606 module_consensus: module_config.version,
1607 api: module_init.supported_api_versions(),
1608 },
1609 )
1610 })
1611 })
1612 .collect(),
1613 }
1614 }
1615
1616 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1617 Self::load_and_refresh_common_api_version_static(
1618 &self.config().await,
1619 &self.module_inits,
1620 &self.api,
1621 &self.db,
1622 &self.task_group,
1623 )
1624 .await
1625 }
1626
1627 async fn load_and_refresh_common_api_version_static(
1633 config: &ClientConfig,
1634 module_init: &ClientModuleInitRegistry,
1635 api: &DynGlobalApi,
1636 db: &Database,
1637 task_group: &TaskGroup,
1638 ) -> anyhow::Result<ApiVersionSet> {
1639 if let Some(v) = db
1640 .begin_transaction_nc()
1641 .await
1642 .get_value(&CachedApiVersionSetKey)
1643 .await
1644 {
1645 debug!("Found existing cached common api versions");
1646 let config = config.clone();
1647 let client_module_init = module_init.clone();
1648 let api = api.clone();
1649 let db = db.clone();
1650 let task_group = task_group.clone();
1651 task_group
1654 .clone()
1655 .spawn_cancellable("refresh_common_api_version_static", async move {
1656 if let Err(error) = Self::refresh_common_api_version_static(
1657 &config,
1658 &client_module_init,
1659 &api,
1660 &db,
1661 task_group,
1662 )
1663 .await
1664 {
1665 warn!(%error, "Failed to discover common api versions");
1666 }
1667 });
1668
1669 return Ok(v.0);
1670 }
1671
1672 debug!("No existing cached common api versions found, waiting for initial discovery");
1673 Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
1674 .await
1675 }
1676
1677 async fn refresh_common_api_version_static(
1678 config: &ClientConfig,
1679 client_module_init: &ClientModuleInitRegistry,
1680 api: &DynGlobalApi,
1681 db: &Database,
1682 task_group: TaskGroup,
1683 ) -> anyhow::Result<ApiVersionSet> {
1684 debug!("Refreshing common api versions");
1685
1686 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1687 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1688
1689 task_group.spawn_cancellable("refresh peers api versions", {
1690 Client::refresh_peers_api_versions(
1691 num_peers,
1692 api.clone(),
1693 db.clone(),
1694 num_responses_sender,
1695 )
1696 });
1697
1698 let _: Result<_, Elapsed> = runtime::timeout(
1706 Duration::from_secs(15),
1707 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1708 )
1709 .await;
1710
1711 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1712
1713 let common_api_versions = discover_common_api_versions_set(
1714 &Self::supported_api_versions_summary_static(config, client_module_init),
1715 &peer_api_version_sets,
1716 )?;
1717
1718 debug!(
1719 value = ?common_api_versions,
1720 "Updating the cached common api versions"
1721 );
1722 let mut dbtx = db.begin_transaction().await;
1723 let _ = dbtx
1724 .insert_entry(
1725 &CachedApiVersionSetKey,
1726 &CachedApiVersionSet(common_api_versions.clone()),
1727 )
1728 .await;
1729
1730 dbtx.commit_tx().await;
1731
1732 Ok(common_api_versions)
1733 }
1734
1735 pub async fn get_metadata(&self) -> Metadata {
1737 self.db
1738 .begin_transaction_nc()
1739 .await
1740 .get_value(&ClientMetadataKey)
1741 .await
1742 .unwrap_or_else(|| {
1743 warn!("Missing existing metadata. This key should have been set on Client init");
1744 Metadata::empty()
1745 })
1746 }
1747
1748 pub async fn set_metadata(&self, metadata: &Metadata) {
1750 self.db
1751 .autocommit::<_, _, anyhow::Error>(
1752 |dbtx, _| {
1753 Box::pin(async {
1754 Self::set_metadata_dbtx(dbtx, metadata).await;
1755 Ok(())
1756 })
1757 },
1758 None,
1759 )
1760 .await
1761 .expect("Failed to autocommit metadata");
1762 }
1763
1764 pub fn has_pending_recoveries(&self) -> bool {
1765 !self
1766 .client_recovery_progress_receiver
1767 .borrow()
1768 .iter()
1769 .all(|(_id, progress)| progress.is_done())
1770 }
1771
1772 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1780 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1781 recovery_receiver
1782 .wait_for(|in_progress| {
1783 in_progress
1784 .iter()
1785 .all(|(_id, progress)| progress.is_done())
1786 })
1787 .await
1788 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1789
1790 Ok(())
1791 }
1792
1793 pub fn subscribe_to_recovery_progress(
1798 &self,
1799 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> {
1800 WatchStream::new(self.client_recovery_progress_receiver.clone())
1801 .flat_map(futures::stream::iter)
1802 }
1803
1804 pub async fn wait_for_module_kind_recovery(
1805 &self,
1806 module_kind: ModuleKind,
1807 ) -> anyhow::Result<()> {
1808 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1809 let config = self.config().await;
1810 recovery_receiver
1811 .wait_for(|in_progress| {
1812 !in_progress
1813 .iter()
1814 .filter(|(module_instance_id, _progress)| {
1815 config.modules[module_instance_id].kind == module_kind
1816 })
1817 .any(|(_id, progress)| !progress.is_done())
1818 })
1819 .await
1820 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1821
1822 Ok(())
1823 }
1824
1825 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1826 loop {
1827 if self.executor.get_active_states().await.is_empty() {
1828 break;
1829 }
1830 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1831 }
1832 Ok(())
1833 }
1834
1835 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1837 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1838 }
1839
1840 fn spawn_module_recoveries_task(
1841 &self,
1842 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1843 module_recoveries: BTreeMap<
1844 ModuleInstanceId,
1845 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1846 >,
1847 module_recovery_progress_receivers: BTreeMap<
1848 ModuleInstanceId,
1849 watch::Receiver<RecoveryProgress>,
1850 >,
1851 ) {
1852 let db = self.db.clone();
1853 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1854 self.task_group
1855 .spawn("module recoveries", |_task_handle| async {
1856 Self::run_module_recoveries_task(
1857 db,
1858 log_ordering_wakeup_tx,
1859 recovery_sender,
1860 module_recoveries,
1861 module_recovery_progress_receivers,
1862 )
1863 .await;
1864 });
1865 }
1866
1867 async fn run_module_recoveries_task(
1868 db: Database,
1869 log_ordering_wakeup_tx: watch::Sender<()>,
1870 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1871 module_recoveries: BTreeMap<
1872 ModuleInstanceId,
1873 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1874 >,
1875 module_recovery_progress_receivers: BTreeMap<
1876 ModuleInstanceId,
1877 watch::Receiver<RecoveryProgress>,
1878 >,
1879 ) {
1880 debug!(target:LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1881 let mut completed_stream = Vec::new();
1882 let progress_stream = futures::stream::FuturesUnordered::new();
1883
1884 for (module_instance_id, f) in module_recoveries {
1885 completed_stream.push(futures::stream::once(Box::pin(async move {
1886 match f.await {
1887 Ok(()) => (module_instance_id, None),
1888 Err(err) => {
1889 warn!(%err, module_instance_id, "Module recovery failed");
1890 futures::future::pending::<()>().await;
1894 unreachable!()
1895 }
1896 }
1897 })));
1898 }
1899
1900 for (module_instance_id, rx) in module_recovery_progress_receivers {
1901 progress_stream.push(
1902 tokio_stream::wrappers::WatchStream::new(rx)
1903 .fuse()
1904 .map(move |progress| (module_instance_id, Some(progress))),
1905 );
1906 }
1907
1908 let mut futures = futures::stream::select(
1909 futures::stream::select_all(progress_stream),
1910 futures::stream::select_all(completed_stream),
1911 );
1912
1913 while let Some((module_instance_id, progress)) = futures.next().await {
1914 let mut dbtx = db.begin_transaction().await;
1915
1916 let prev_progress = *recovery_sender
1917 .borrow()
1918 .get(&module_instance_id)
1919 .expect("existing progress must be present");
1920
1921 let progress = if prev_progress.is_done() {
1922 prev_progress
1924 } else if let Some(progress) = progress {
1925 progress
1926 } else {
1927 prev_progress.to_complete()
1928 };
1929
1930 if !prev_progress.is_done() && progress.is_done() {
1931 info!(
1932 module_instance_id,
1933 prev_progress = format!("{}/{}", prev_progress.complete, prev_progress.total),
1934 progress = format!("{}/{}", progress.complete, progress.total),
1935 "Recovery complete"
1936 );
1937 dbtx.log_event(
1938 log_ordering_wakeup_tx.clone(),
1939 None,
1940 ModuleRecoveryCompleted {
1941 module_id: module_instance_id,
1942 },
1943 )
1944 .await;
1945 } else {
1946 info!(
1947 module_instance_id,
1948 prev_progress = format!("{}/{}", prev_progress.complete, prev_progress.total),
1949 progress = format!("{}/{}", progress.complete, progress.total),
1950 "Recovery progress"
1951 );
1952 }
1953
1954 dbtx.insert_entry(
1955 &ClientModuleRecovery { module_instance_id },
1956 &ClientModuleRecoveryState { progress },
1957 )
1958 .await;
1959 dbtx.commit_tx().await;
1960
1961 recovery_sender.send_modify(|v| {
1962 v.insert(module_instance_id, progress);
1963 });
1964 }
1965 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1966 }
1967
1968 async fn load_peers_last_api_versions(
1969 db: &Database,
1970 num_peers: NumPeers,
1971 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1972 let mut peer_api_version_sets = BTreeMap::new();
1973
1974 let mut dbtx = db.begin_transaction_nc().await;
1975 for peer_id in num_peers.peer_ids() {
1976 if let Some(v) = dbtx
1977 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1978 .await
1979 {
1980 peer_api_version_sets.insert(peer_id, v.0);
1981 }
1982 }
1983 drop(dbtx);
1984 peer_api_version_sets
1985 }
1986
1987 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1990 self.db()
1991 .begin_transaction_nc()
1992 .await
1993 .find_by_prefix(&ApiAnnouncementPrefix)
1994 .await
1995 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1996 .collect()
1997 .await
1998 }
1999
2000 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
2002 get_api_urls(&self.db, &self.config().await).await
2003 }
2004
2005 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2008 self.get_peer_urls()
2009 .await
2010 .into_iter()
2011 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
2012 .map(|peer_url| {
2013 InviteCode::new(
2014 peer_url.clone(),
2015 peer,
2016 self.federation_id(),
2017 self.api_secret.clone(),
2018 )
2019 })
2020 }
2021
2022 pub async fn get_guardian_public_keys_blocking(
2026 &self,
2027 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
2028 self.db.autocommit(|dbtx, _| Box::pin(async move {
2029 let config = self.config().await;
2030
2031 let guardian_pub_keys = if let Some(guardian_pub_keys) = config.global.broadcast_public_keys {guardian_pub_keys}else{
2032 let fetched_config = retry(
2033 "Fetching guardian public keys",
2034 backoff_util::background_backoff(),
2035 || async {
2036 Ok(self.api.request_current_consensus::<ClientConfig>(
2037 CLIENT_CONFIG_ENDPOINT.to_owned(),
2038 ApiRequestErased::default(),
2039 ).await?)
2040 },
2041 )
2042 .await
2043 .expect("Will never return on error");
2044
2045 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
2046 warn!("Guardian public keys not found in fetched config, server not updated to 0.4 yet");
2047 pending::<()>().await;
2048 unreachable!("Pending will never return");
2049 };
2050
2051 let new_config = ClientConfig {
2052 global: GlobalClientConfig {
2053 broadcast_public_keys: Some(guardian_pub_keys.clone()),
2054 ..config.global
2055 },
2056 modules: config.modules,
2057 };
2058
2059 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
2060 *(self.config.write().await) = new_config;
2061 guardian_pub_keys
2062 };
2063
2064 Result::<_, ()>::Ok(guardian_pub_keys)
2065 }), None).await.expect("Will retry forever")
2066 }
2067
2068 pub fn handle_global_rpc(
2069 &self,
2070 method: String,
2071 params: serde_json::Value,
2072 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
2073 Box::pin(try_stream! {
2074 match method.as_str() {
2075 "get_balance" => {
2076 let balance = self.get_balance().await;
2077 yield serde_json::to_value(balance)?;
2078 }
2079 "subscribe_balance_changes" => {
2080 let mut stream = self.subscribe_balance_changes().await;
2081 while let Some(balance) = stream.next().await {
2082 yield serde_json::to_value(balance)?;
2083 }
2084 }
2085 "get_config" => {
2086 let config = self.config().await;
2087 yield serde_json::to_value(config)?;
2088 }
2089 "get_federation_id" => {
2090 let federation_id = self.federation_id();
2091 yield serde_json::to_value(federation_id)?;
2092 }
2093 "get_invite_code" => {
2094 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
2095 let invite_code = self.invite_code(req.peer).await;
2096 yield serde_json::to_value(invite_code)?;
2097 }
2098 "list_operations" => {
2099 let operations = self.operation_log().list_operations(usize::MAX, None).await;
2101 yield serde_json::to_value(operations)?;
2102 }
2103 "has_pending_recoveries" => {
2104 let has_pending = self.has_pending_recoveries();
2105 yield serde_json::to_value(has_pending)?;
2106 }
2107 "wait_for_all_recoveries" => {
2108 self.wait_for_all_recoveries().await?;
2109 yield serde_json::Value::Null;
2110 }
2111 "subscribe_to_recovery_progress" => {
2112 let mut stream = self.subscribe_to_recovery_progress();
2113 while let Some((module_id, progress)) = stream.next().await {
2114 yield serde_json::json!({
2115 "module_id": module_id,
2116 "progress": progress
2117 });
2118 }
2119 }
2120 _ => {
2121 Err(anyhow::format_err!("Unknown method: {}", method))?;
2122 unreachable!()
2123 },
2124 }
2125 })
2126 }
2127
2128 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
2129 where
2130 E: Event + Send,
2131 {
2132 let mut dbtx = self.db.begin_transaction().await;
2133 self.log_event_dbtx(&mut dbtx, module_id, event).await;
2134 dbtx.commit_tx().await;
2135 }
2136
2137 pub async fn log_event_dbtx<E, Cap>(
2138 &self,
2139 dbtx: &mut DatabaseTransaction<'_, Cap>,
2140 module_id: Option<ModuleInstanceId>,
2141 event: E,
2142 ) where
2143 E: Event + Send,
2144 Cap: Send,
2145 {
2146 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
2147 .await;
2148 }
2149
2150 pub async fn log_event_raw_dbtx<Cap>(
2151 &self,
2152 dbtx: &mut DatabaseTransaction<'_, Cap>,
2153 kind: EventKind,
2154 module: Option<(ModuleKind, ModuleInstanceId)>,
2155 payload: Vec<u8>,
2156 transient: bool,
2157 ) where
2158 Cap: Send,
2159 {
2160 let module_id = module.as_ref().map(|m| m.1);
2161 let module_kind = module.map(|m| m.0);
2162 dbtx.log_event_raw(
2163 self.log_ordering_wakeup_tx.clone(),
2164 kind,
2165 module_kind,
2166 module_id,
2167 payload,
2168 transient,
2169 )
2170 .await;
2171 }
2172
2173 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
2174 where
2175 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
2176 K: DatabaseRecord<Value = EventLogId>,
2177 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2178 R: Future<Output = anyhow::Result<()>>,
2179 {
2180 event_log::handle_events(
2181 self.db.clone(),
2182 pos_key,
2183 self.log_event_added_rx.clone(),
2184 call_fn,
2185 )
2186 .await
2187 }
2188
2189 pub async fn get_event_log(
2190 &self,
2191 pos: Option<EventLogId>,
2192 limit: u64,
2193 ) -> Vec<(
2194 EventLogId,
2195 EventKind,
2196 Option<(ModuleKind, ModuleInstanceId)>,
2197 u64,
2198 serde_json::Value,
2199 )> {
2200 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2201 .await
2202 }
2203
2204 pub async fn get_event_log_dbtx<Cap>(
2205 &self,
2206 dbtx: &mut DatabaseTransaction<'_, Cap>,
2207 pos: Option<EventLogId>,
2208 limit: u64,
2209 ) -> Vec<(
2210 EventLogId,
2211 EventKind,
2212 Option<(ModuleKind, ModuleInstanceId)>,
2213 u64,
2214 serde_json::Value,
2215 )>
2216 where
2217 Cap: Send,
2218 {
2219 dbtx.get_event_log(pos, limit).await
2220 }
2221
2222 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2224 self.log_event_added_transient_tx.subscribe()
2225 }
2226}
2227
2228#[derive(Deserialize)]
2229struct GetInviteCodeRequest {
2230 peer: PeerId,
2231}
2232
2233pub struct TransactionUpdates {
2235 update_stream: BoxStream<'static, TxSubmissionStatesSM>,
2236}
2237
2238impl TransactionUpdates {
2239 pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
2242 debug!(target: LOG_CLIENT, %await_txid, "Await tx accepted");
2243 self.update_stream
2244 .filter_map(|tx_update| {
2245 std::future::ready(match tx_update.state {
2246 TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
2247 TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
2248 Some(Err(submit_error))
2249 }
2250 _ => None,
2251 })
2252 })
2253 .next_or_pending()
2254 .await?;
2255 debug!(target: LOG_CLIENT, %await_txid, "Tx accepted");
2256 Ok(())
2257 }
2258}
2259
2260pub struct AdminCreds {
2262 pub peer_id: PeerId,
2264 pub auth: ApiAuth,
2266}
2267
2268pub struct ClientBuilder {
2270 module_inits: ClientModuleInitRegistry,
2271 primary_module_instance: Option<ModuleInstanceId>,
2272 primary_module_kind: Option<ModuleKind>,
2273 admin_creds: Option<AdminCreds>,
2274 db_no_decoders: Database,
2275 meta_service: Arc<MetaService>,
2276 connector: Connector,
2277 stopped: bool,
2278 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2279}
2280
2281impl ClientBuilder {
2282 fn new(db: Database) -> Self {
2283 let meta_service = MetaService::new(LegacyMetaSource::default());
2284 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
2285 broadcast::channel(1024);
2286 ClientBuilder {
2287 module_inits: ModuleInitRegistry::new(),
2288 primary_module_instance: None,
2289 primary_module_kind: None,
2290 connector: Connector::default(),
2291 admin_creds: None,
2292 db_no_decoders: db,
2293 stopped: false,
2294 meta_service,
2295 log_event_added_transient_tx,
2296 }
2297 }
2298
2299 fn from_existing(client: &Client) -> Self {
2300 ClientBuilder {
2301 module_inits: client.module_inits.clone(),
2302 primary_module_instance: Some(client.primary_module_instance),
2303 primary_module_kind: None,
2304 admin_creds: None,
2305 db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
2306 stopped: false,
2307 meta_service: client.meta_service.clone(),
2309 connector: client.connector,
2310 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
2311 }
2312 }
2313
2314 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
2316 self.module_inits = module_inits;
2317 }
2318
2319 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
2321 self.module_inits.attach(module_init);
2322 }
2323
2324 pub fn stopped(&mut self) {
2325 self.stopped = true;
2326 }
2327
2328 #[deprecated(
2334 since = "0.6.0",
2335 note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
2336 )]
2337 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
2338 self.with_primary_module_instance_id(primary_module_instance);
2339 }
2340
2341 pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
2355 let was_replaced = self
2356 .primary_module_instance
2357 .replace(primary_module_instance)
2358 .is_some();
2359 assert!(
2360 !was_replaced,
2361 "Only one primary module can be given to the builder."
2362 );
2363 }
2364
2365 pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
2371 let was_replaced = self
2372 .primary_module_kind
2373 .replace(primary_module_kind)
2374 .is_some();
2375 assert!(
2376 !was_replaced,
2377 "Only one primary module kind can be given to the builder."
2378 );
2379 }
2380
2381 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
2382 self.meta_service = meta_service;
2383 }
2384
2385 async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> {
2386 if let Ok(client_config) = self.load_existing_config().await {
2390 for (module_id, module_cfg) in client_config.modules {
2391 let kind = module_cfg.kind.clone();
2392 let Some(init) = self.module_inits.get(&kind) else {
2393 continue;
2395 };
2396
2397 apply_migrations_client(
2398 db,
2399 kind.to_string(),
2400 init.get_database_migrations(),
2401 module_id,
2402 )
2403 .await?;
2404 }
2405 }
2406
2407 Ok(())
2408 }
2409
2410 pub fn db_no_decoders(&self) -> &Database {
2411 &self.db_no_decoders
2412 }
2413
2414 pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
2415 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2416 bail!("Client database not initialized")
2417 };
2418
2419 Ok(config)
2420 }
2421
2422 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
2423 self.admin_creds = Some(creds);
2424 }
2425
2426 pub fn with_connector(&mut self, connector: Connector) {
2427 self.connector = connector;
2428 }
2429
2430 #[cfg(feature = "tor")]
2431 pub fn with_tor_connector(&mut self) {
2432 self.with_connector(Connector::tor());
2433 }
2434
2435 async fn init(
2436 self,
2437 pre_root_secret: DerivableSecret,
2438 config: ClientConfig,
2439 api_secret: Option<String>,
2440 init_mode: InitMode,
2441 ) -> anyhow::Result<ClientHandle> {
2442 if Client::is_initialized(&self.db_no_decoders).await {
2443 bail!("Client database already initialized")
2444 }
2445
2446 {
2449 debug!(target: LOG_CLIENT, "Initializing client database");
2450 let mut dbtx = self.db_no_decoders.begin_transaction().await;
2451 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
2453 dbtx.insert_entry(
2454 &ClientPreRootSecretHashKey,
2455 &pre_root_secret.derive_pre_root_secret_hash(),
2456 )
2457 .await;
2458
2459 if let Some(api_secret) = api_secret.as_ref() {
2460 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
2461 }
2462
2463 let init_state = InitState::Pending(init_mode);
2464 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
2465
2466 let metadata = init_state
2467 .does_require_recovery()
2468 .flatten()
2469 .map_or(Metadata::empty(), |s| s.metadata);
2470
2471 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
2472
2473 dbtx.commit_tx_result().await?;
2474 }
2475
2476 let stopped = self.stopped;
2477 self.build(pre_root_secret, config, api_secret, stopped)
2478 .await
2479 }
2480
2481 pub async fn join(
2555 self,
2556 pre_root_secret: DerivableSecret,
2557 config: ClientConfig,
2558 api_secret: Option<String>,
2559 ) -> anyhow::Result<ClientHandle> {
2560 self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
2561 .await
2562 }
2563
2564 pub async fn download_backup_from_federation(
2566 &self,
2567 root_secret: &DerivableSecret,
2568 config: &ClientConfig,
2569 api_secret: Option<String>,
2570 ) -> anyhow::Result<Option<ClientBackup>> {
2571 let connector = self.connector;
2572 let api = DynGlobalApi::from_endpoints(
2573 config
2575 .global
2576 .api_endpoints
2577 .iter()
2578 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
2579 &api_secret,
2580 &connector,
2581 );
2582 Client::download_backup_from_federation_static(
2583 &api,
2584 &Self::federation_root_secret(root_secret, config),
2585 &self.decoders(config),
2586 )
2587 .await
2588 }
2589
2590 pub async fn recover(
2601 self,
2602 root_secret: DerivableSecret,
2603 config: ClientConfig,
2604 api_secret: Option<String>,
2605 backup: Option<ClientBackup>,
2606 ) -> anyhow::Result<ClientHandle> {
2607 let client = self
2608 .init(
2609 root_secret,
2610 config,
2611 api_secret,
2612 InitMode::Recover {
2613 snapshot: backup.clone(),
2614 },
2615 )
2616 .await?;
2617
2618 Ok(client)
2619 }
2620
2621 pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
2622 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2623 bail!("Client database not initialized")
2624 };
2625
2626 if let Some(secret_hash) = self
2627 .db_no_decoders()
2628 .begin_transaction_nc()
2629 .await
2630 .get_value(&ClientPreRootSecretHashKey)
2631 .await
2632 {
2633 ensure!(
2634 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
2635 "Secret hash does not match. Incorrect secret"
2636 );
2637 } else {
2638 debug!(target: LOG_CLIENT, "Backfilling secret hash");
2639 let mut dbtx = self.db_no_decoders.begin_transaction().await;
2641 dbtx.insert_entry(
2642 &ClientPreRootSecretHashKey,
2643 &pre_root_secret.derive_pre_root_secret_hash(),
2644 )
2645 .await;
2646 dbtx.commit_tx().await;
2647 }
2648
2649 let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
2650 let stopped = self.stopped;
2651
2652 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2653 let client = self
2654 .build_stopped(
2655 pre_root_secret,
2656 &config,
2657 api_secret,
2658 log_event_added_transient_tx,
2659 )
2660 .await?;
2661 if !stopped {
2662 client.as_inner().start_executor();
2663 }
2664 Ok(client)
2665 }
2666
2667 async fn build(
2669 self,
2670 pre_root_secret: DerivableSecret,
2671 config: ClientConfig,
2672 api_secret: Option<String>,
2673 stopped: bool,
2674 ) -> anyhow::Result<ClientHandle> {
2675 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2676 let client = self
2677 .build_stopped(
2678 pre_root_secret,
2679 &config,
2680 api_secret,
2681 log_event_added_transient_tx,
2682 )
2683 .await?;
2684 if !stopped {
2685 client.as_inner().start_executor();
2686 }
2687
2688 Ok(client)
2689 }
2690
2691 async fn build_stopped(
2694 self,
2695 root_secret: DerivableSecret,
2696 config: &ClientConfig,
2697 api_secret: Option<String>,
2698 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2699 ) -> anyhow::Result<ClientHandle> {
2700 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
2701 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
2702
2703 let decoders = self.decoders(config);
2704 let config = Self::config_decoded(config, &decoders)?;
2705 let fed_id = config.calculate_federation_id();
2706 let db = self.db_no_decoders.with_decoders(decoders.clone());
2707 let connector = self.connector;
2708 let peer_urls = get_api_urls(&db, &config).await;
2709 let api = if let Some(admin_creds) = self.admin_creds.as_ref() {
2710 WsFederationApi::new_admin(
2711 admin_creds.peer_id,
2712 peer_urls
2713 .into_iter()
2714 .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
2715 .context("Admin creds should match a peer")?,
2716 &api_secret,
2717 &connector,
2718 )
2719 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2720 .with_cache()
2721 .into()
2722 } else {
2723 WsFederationApi::from_endpoints(peer_urls, &api_secret, &connector)
2724 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2725 .with_cache()
2726 .into()
2727 };
2728 let task_group = TaskGroup::new();
2729
2730 self.migrate_database(&db).await?;
2733
2734 let init_state = Self::load_init_state(&db).await;
2735
2736 let primary_module_instance = self
2737 .primary_module_instance
2738 .or_else(|| {
2739 let primary_module_kind = self.primary_module_kind?;
2740 config
2741 .modules
2742 .iter()
2743 .find_map(|(module_instance_id, module_config)| {
2744 (module_config.kind() == &primary_module_kind)
2745 .then_some(*module_instance_id)
2746 })
2747 })
2748 .ok_or(anyhow!("No primary module set or found"))?;
2749
2750 let notifier = Notifier::new(db.clone());
2751
2752 let common_api_versions = Client::load_and_refresh_common_api_version_static(
2753 &config,
2754 &self.module_inits,
2755 &api,
2756 &db,
2757 &task_group,
2758 )
2759 .await
2760 .inspect_err(|err| {
2761 warn!(target: LOG_CLIENT, %err, "Failed to discover initial API version to use.");
2762 })
2763 .unwrap_or(ApiVersionSet {
2764 core: ApiVersion::new(0, 0),
2765 modules: BTreeMap::new(),
2767 });
2768
2769 debug!(?common_api_versions, "Completed api version negotiation");
2770
2771 let mut module_recoveries: BTreeMap<
2772 ModuleInstanceId,
2773 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
2774 > = BTreeMap::new();
2775 let mut module_recovery_progress_receivers: BTreeMap<
2776 ModuleInstanceId,
2777 watch::Receiver<RecoveryProgress>,
2778 > = BTreeMap::new();
2779
2780 let final_client = FinalClient::default();
2781
2782 let root_secret = Self::federation_root_secret(&root_secret, &config);
2783
2784 let modules = {
2785 let mut modules = ClientModuleRegistry::default();
2786 for (module_instance_id, module_config) in config.modules.clone() {
2787 let kind = module_config.kind().clone();
2788 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
2789 debug!("Module kind {kind} of instance {module_instance_id} not found in module gens, skipping");
2790 continue;
2791 };
2792
2793 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
2794 else {
2795 warn!("Module kind {kind} of instance {module_instance_id} has not compatible api version, skipping");
2796 continue;
2797 };
2798
2799 let start_module_recover_fn =
2802 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
2803 let module_config = module_config.clone();
2804 let num_peers = NumPeers::from(config.global.api_endpoints.len());
2805 let db = db.clone();
2806 let kind = kind.clone();
2807 let notifier = notifier.clone();
2808 let api = api.clone();
2809 let root_secret = root_secret.clone();
2810 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
2811 let final_client = final_client.clone();
2812 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
2813 let task_group = task_group.clone();
2814 let module_init = module_init.clone();
2815 (
2816 Box::pin(async move {
2817 module_init
2818 .recover(
2819 final_client.clone(),
2820 fed_id,
2821 num_peers,
2822 module_config.clone(),
2823 db.clone(),
2824 module_instance_id,
2825 common_api_versions.core,
2826 api_version,
2827 root_secret.derive_module_secret(module_instance_id),
2828 notifier.clone(),
2829 api.clone(),
2830 admin_auth,
2831 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
2832 progress_tx,
2833 task_group,
2834 )
2835 .await
2836 .map_err(|err| {
2837 warn!(
2838 module_id = module_instance_id, %kind, %err, "Module failed to recover"
2839 );
2840 err
2841 })
2842 }),
2843 progress_rx,
2844 )
2845 };
2846
2847 let recovery = if let Some(snapshot) = init_state.does_require_recovery() {
2848 if let Some(module_recovery_state) = db
2849 .begin_transaction_nc()
2850 .await
2851 .get_value(&ClientModuleRecovery { module_instance_id })
2852 .await
2853 {
2854 if module_recovery_state.is_done() {
2855 debug!(
2856 id = %module_instance_id,
2857 %kind, "Module recovery already complete"
2858 );
2859 None
2860 } else {
2861 debug!(
2862 id = %module_instance_id,
2863 %kind,
2864 progress = %module_recovery_state.progress,
2865 "Starting module recovery with an existing progress"
2866 );
2867 Some(start_module_recover_fn(
2868 snapshot,
2869 module_recovery_state.progress,
2870 ))
2871 }
2872 } else {
2873 let progress = RecoveryProgress::none();
2874 let mut dbtx = db.begin_transaction().await;
2875 dbtx.log_event(
2876 log_ordering_wakeup_tx.clone(),
2877 None,
2878 ModuleRecoveryStarted {
2879 module_id: module_instance_id,
2880 },
2881 )
2882 .await;
2883 dbtx.insert_entry(
2884 &ClientModuleRecovery { module_instance_id },
2885 &ClientModuleRecoveryState { progress },
2886 )
2887 .await;
2888
2889 dbtx.commit_tx().await;
2890
2891 debug!(
2892 id = %module_instance_id,
2893 %kind, "Starting new module recovery"
2894 );
2895 Some(start_module_recover_fn(snapshot, progress))
2896 }
2897 } else {
2898 None
2899 };
2900
2901 if let Some((recovery, recovery_progress_rx)) = recovery {
2902 module_recoveries.insert(module_instance_id, recovery);
2903 module_recovery_progress_receivers
2904 .insert(module_instance_id, recovery_progress_rx);
2905 } else {
2906 let module = module_init
2907 .init(
2908 final_client.clone(),
2909 fed_id,
2910 config.global.api_endpoints.len(),
2911 module_config,
2912 db.clone(),
2913 module_instance_id,
2914 common_api_versions.core,
2915 api_version,
2916 root_secret.derive_module_secret(module_instance_id),
2921 notifier.clone(),
2922 api.clone(),
2923 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
2924 task_group.clone(),
2925 )
2926 .await?;
2927
2928 if primary_module_instance == module_instance_id
2929 && !module.supports_being_primary()
2930 {
2931 bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module");
2932 }
2933
2934 modules.register_module(module_instance_id, kind, module);
2935 }
2936 }
2937 modules
2938 };
2939
2940 if init_state.is_pending() && module_recoveries.is_empty() {
2941 let mut dbtx = db.begin_transaction().await;
2942 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
2943 .await;
2944 dbtx.commit_tx().await;
2945 }
2946
2947 let executor = {
2948 let mut executor_builder = Executor::builder();
2949 executor_builder
2950 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
2951
2952 for (module_instance_id, _, module) in modules.iter_modules() {
2953 executor_builder.with_module_dyn(module.context(module_instance_id));
2954 }
2955
2956 for module_instance_id in module_recoveries.keys() {
2957 executor_builder.with_valid_module_id(*module_instance_id);
2958 }
2959
2960 executor_builder.build(db.clone(), notifier, task_group.clone())
2961 };
2962
2963 let recovery_receiver_init_val = module_recovery_progress_receivers
2964 .iter()
2965 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
2966 .collect::<BTreeMap<_, _>>();
2967 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
2968 watch::channel(recovery_receiver_init_val);
2969
2970 let client_inner = Arc::new(Client {
2971 config: RwLock::new(config.clone()),
2972 api_secret,
2973 decoders,
2974 db: db.clone(),
2975 federation_id: fed_id,
2976 federation_meta: config.global.meta,
2977 primary_module_instance,
2978 modules,
2979 module_inits: self.module_inits.clone(),
2980 log_ordering_wakeup_tx,
2981 log_event_added_rx,
2982 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
2983 executor,
2984 api,
2985 secp_ctx: Secp256k1::new(),
2986 root_secret,
2987 task_group,
2988 operation_log: OperationLog::new(db.clone()),
2989 client_recovery_progress_receiver,
2990 meta_service: self.meta_service,
2991 connector,
2992 });
2993 client_inner
2994 .task_group
2995 .spawn_cancellable("MetaService::update_continuously", {
2996 let client_inner = client_inner.clone();
2997 async move {
2998 client_inner
2999 .meta_service
3000 .update_continuously(&client_inner)
3001 .await;
3002 }
3003 });
3004
3005 client_inner.task_group.spawn_cancellable(
3006 "update-api-announcements",
3007 run_api_announcement_sync(client_inner.clone()),
3008 );
3009
3010 client_inner.task_group.spawn_cancellable(
3011 "event log ordering task",
3012 run_event_log_ordering_task(
3013 db.clone(),
3014 log_ordering_wakeup_rx,
3015 log_event_added_tx,
3016 log_event_added_transient_tx,
3017 ),
3018 );
3019 let client_arc = ClientHandle::new(client_inner);
3020
3021 for (_, _, module) in client_arc.modules.iter_modules() {
3022 module.start().await;
3023 }
3024
3025 final_client.set(client_arc.downgrade());
3026
3027 if !module_recoveries.is_empty() {
3028 client_arc.spawn_module_recoveries_task(
3029 client_recovery_progress_sender,
3030 module_recoveries,
3031 module_recovery_progress_receivers,
3032 );
3033 }
3034
3035 Ok(client_arc)
3036 }
3037
3038 async fn load_init_state(db: &Database) -> InitState {
3039 let mut dbtx = db.begin_transaction_nc().await;
3040 dbtx.get_value(&ClientInitStateKey)
3041 .await
3042 .unwrap_or_else(|| {
3043 warn!("Client missing ClientRequiresRecovery: assuming complete");
3046 db::InitState::Complete(db::InitModeComplete::Fresh)
3047 })
3048 }
3049
3050 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
3051 let mut decoders = client_decoders(
3052 &self.module_inits,
3053 config
3054 .modules
3055 .iter()
3056 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
3057 );
3058
3059 decoders.register_module(
3060 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
3061 ModuleKind::from_static_str("tx_submission"),
3062 tx_submission_sm_decoder(),
3063 );
3064
3065 decoders
3066 }
3067
3068 fn config_decoded(
3069 config: &ClientConfig,
3070 decoders: &ModuleDecoderRegistry,
3071 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
3072 config.clone().redecode_raw(decoders)
3073 }
3074
3075 fn federation_root_secret(
3079 root_secret: &DerivableSecret,
3080 config: &ClientConfig,
3081 ) -> DerivableSecret {
3082 root_secret.federation_key(&config.global.calculate_federation_id())
3083 }
3084
3085 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
3087 self.log_event_added_transient_tx.subscribe()
3088 }
3089}
3090
3091pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
3095 let mut tx = db.begin_transaction_nc().await;
3096 let client_secret = tx.get_value(&EncodedClientSecretKey).await;
3097
3098 match client_secret {
3099 Some(client_secret) => {
3100 T::consensus_decode(&mut client_secret.as_slice(), &ModuleRegistry::default())
3101 .map_err(|e| anyhow!("Decoding failed: {e}"))
3102 }
3103 None => bail!("Encoded client secret not present in DB"),
3104 }
3105}
3106
3107pub fn client_decoders<'a>(
3108 registry: &ModuleInitRegistry<DynClientModuleInit>,
3109 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
3110) -> ModuleDecoderRegistry {
3111 let mut modules = BTreeMap::new();
3112 for (id, kind) in module_kinds {
3113 let Some(init) = registry.get(kind) else {
3114 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
3115 continue;
3116 };
3117
3118 modules.insert(
3119 id,
3120 (
3121 kind.clone(),
3122 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
3123 ),
3124 );
3125 }
3126 ModuleDecoderRegistry::from(modules)
3127}