1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::explicit_deref_methods)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::needless_lifetimes)]
10#![allow(clippy::return_self_not_must_use)]
11#![allow(clippy::too_many_lines)]
12#![allow(clippy::type_complexity)]
13
14use std::collections::{BTreeMap, HashSet};
84use std::fmt::{Debug, Formatter};
85use std::future::pending;
86use std::ops::{self, Range};
87use std::pin::Pin;
88use std::sync::Arc;
89use std::time::{Duration, SystemTime, UNIX_EPOCH};
90
91use anyhow::{anyhow, bail, ensure, format_err, Context};
92use api::ClientRawFederationApiExt as _;
93use async_stream::{stream, try_stream};
94use backup::ClientBackup;
95use bitcoin::secp256k1;
96use db::{
97 apply_migrations_client, apply_migrations_core_client, get_core_client_database_migrations,
98 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientInitStateKey,
99 ClientModuleRecovery, ClientPreRootSecretHashKey, EncodedClientSecretKey, InitMode,
100 PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey,
101};
102use fedimint_api_client::api::net::Connector;
103use fedimint_api_client::api::{
104 ApiVersionSet, DynGlobalApi, DynModuleApi, FederationApiExt, GlobalFederationApiWithCacheExt,
105 IGlobalFederationApi, ReconnectFederationApi,
106};
107use fedimint_core::config::{
108 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
109};
110use fedimint_core::core::{
111 DynInput, DynOutput, IInput, IOutput, IntoDynInstance as _, ModuleInstanceId, ModuleKind,
112 OperationId,
113};
114use fedimint_core::db::{
115 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
116 IDatabaseTransactionOpsCoreTyped, NonCommittable,
117};
118use fedimint_core::encoding::{Decodable, Encodable};
119use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
120use fedimint_core::invite_code::InviteCode;
121use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
122use fedimint_core::module::{
123 ApiAuth, ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
124 SupportedCoreApiVersions, SupportedModuleApiVersions,
125};
126use fedimint_core::net::api_announcement::SignedApiAnnouncement;
127use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
128use fedimint_core::transaction::Transaction;
129use fedimint_core::util::{
130 backoff_util, retry, BoxStream, FmtCompact as _, FmtCompactAnyhow as _, NextOrPending, SafeUrl,
131};
132use fedimint_core::{
133 apply, async_trait_maybe_send, dyn_newtype_define, fedimint_build_code_version_env,
134 maybe_add_send, maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId,
135 TransactionId,
136};
137pub use fedimint_derive_secret as derivable_secret;
138use fedimint_derive_secret::DerivableSecret;
139use fedimint_eventlog::{
140 self, run_event_log_ordering_task, DBTransactionEventLogExt, Event, EventKind, EventLogEntry,
141 EventLogId,
142};
143use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
144use futures::stream::FuturesUnordered;
145use futures::{Future, Stream, StreamExt};
146use meta::{LegacyMetaSource, MetaService};
147use module::recovery::RecoveryProgress;
148use module::{ClientContextIface, DynClientModule, FinalClientIface, IdxRange, OutPointRange};
149use rand::thread_rng;
150use secp256k1::{PublicKey, Secp256k1};
151use secret::{DeriveableSecretClientExt, PlainRootSecretStrategy, RootSecretStrategy as _};
152use serde::{Deserialize, Serialize};
153use thiserror::Error;
154#[cfg(not(target_family = "wasm"))]
155use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor};
156use tokio::sync::{broadcast, watch, RwLock};
157use tokio_stream::wrappers::WatchStream;
158use tracing::{debug, error, info, trace, warn};
159use transaction::{
160 ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputSM, TxSubmissionStatesSM,
161};
162
163use crate::api_announcements::{get_api_urls, run_api_announcement_sync, ApiAnnouncementPrefix};
164use crate::api_version_discovery::discover_common_api_versions_set;
165use crate::backup::Metadata;
166use crate::db::{ClientMetadataKey, ClientModuleRecoveryState, InitState, OperationLogKey};
167use crate::module::init::{
168 ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit,
169};
170use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator};
171use crate::oplog::OperationLog;
172use crate::sm::executor::{
173 ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix,
174};
175use crate::sm::{ClientSMDatabaseTransaction, DynState, Executor, IState, Notifier, State};
176use crate::transaction::{
177 tx_submission_sm_decoder, ClientInput, ClientOutputBundle, TransactionBuilder,
178 TxSubmissionContext, TxSubmissionStates, TRANSACTION_SUBMISSION_MODULE_INSTANCE,
179};
180
181pub mod api;
182
183pub mod backup;
185pub mod db;
187pub mod envs;
189pub mod module;
191pub mod oplog;
193pub mod secret;
195pub mod sm;
197pub mod transaction;
199
200mod api_version_discovery;
201
202pub mod api_announcements;
203pub mod meta;
205
206#[derive(Serialize, Deserialize)]
207pub struct TxCreatedEvent {
208 txid: TransactionId,
209 operation_id: OperationId,
210}
211
212impl Event for TxCreatedEvent {
213 const MODULE: Option<ModuleKind> = None;
214
215 const KIND: EventKind = EventKind::from_static("tx-created");
216}
217
218#[derive(Serialize, Deserialize)]
219pub struct TxAcceptedEvent {
220 txid: TransactionId,
221 operation_id: OperationId,
222}
223
224impl Event for TxAcceptedEvent {
225 const MODULE: Option<ModuleKind> = None;
226
227 const KIND: EventKind = EventKind::from_static("tx-accepted");
228}
229
230#[derive(Serialize, Deserialize)]
231pub struct TxRejectedEvent {
232 txid: TransactionId,
233 error: String,
234 operation_id: OperationId,
235}
236impl Event for TxRejectedEvent {
237 const MODULE: Option<ModuleKind> = None;
238
239 const KIND: EventKind = EventKind::from_static("tx-rejected");
240}
241
242#[derive(Serialize, Deserialize)]
243pub struct ModuleRecoveryStarted {
244 module_id: ModuleInstanceId,
245}
246
247impl Event for ModuleRecoveryStarted {
248 const MODULE: Option<ModuleKind> = None;
249
250 const KIND: EventKind = EventKind::from_static("module-recovery-started");
251}
252
253#[derive(Serialize, Deserialize)]
254pub struct ModuleRecoveryCompleted {
255 module_id: ModuleInstanceId,
256}
257
258impl Event for ModuleRecoveryCompleted {
259 const MODULE: Option<ModuleKind> = None;
260
261 const KIND: EventKind = EventKind::from_static("module-recovery-completed");
262}
263
264pub type InstancelessDynClientInput = ClientInput<Box<maybe_add_send_sync!(dyn IInput + 'static)>>;
265
266pub type InstancelessDynClientInputSM =
267 ClientInputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
268
269pub type InstancelessDynClientInputBundle = ClientInputBundle<
270 Box<maybe_add_send_sync!(dyn IInput + 'static)>,
271 Box<maybe_add_send_sync!(dyn IState + 'static)>,
272>;
273
274pub type InstancelessDynClientOutput =
275 ClientOutput<Box<maybe_add_send_sync!(dyn IOutput + 'static)>>;
276
277pub type InstancelessDynClientOutputSM =
278 ClientOutputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
279pub type InstancelessDynClientOutputBundle = ClientOutputBundle<
280 Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
281 Box<maybe_add_send_sync!(dyn IState + 'static)>,
282>;
283
284#[derive(Debug, Error)]
285pub enum AddStateMachinesError {
286 #[error("State already exists in database")]
287 StateAlreadyExists,
288 #[error("Got {0}")]
289 Other(#[from] anyhow::Error),
290}
291
292pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
293
294#[apply(async_trait_maybe_send!)]
295pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
296 fn module_api(&self) -> DynModuleApi;
299
300 async fn client_config(&self) -> ClientConfig;
301
302 fn api(&self) -> &DynGlobalApi;
308
309 fn decoders(&self) -> &ModuleDecoderRegistry;
310
311 async fn claim_inputs_dyn(
316 &self,
317 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
318 inputs: InstancelessDynClientInputBundle,
319 ) -> anyhow::Result<OutPointRange>;
320
321 async fn fund_output_dyn(
326 &self,
327 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
328 outputs: InstancelessDynClientOutputBundle,
329 ) -> anyhow::Result<OutPointRange>;
330
331 async fn add_state_machine_dyn(
333 &self,
334 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
335 sm: Box<maybe_add_send_sync!(dyn IState)>,
336 ) -> AddStateMachinesResult;
337
338 async fn log_event_json(
339 &self,
340 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
341 kind: EventKind,
342 module: Option<(ModuleKind, ModuleInstanceId)>,
343 payload: serde_json::Value,
344 persist: bool,
345 );
346
347 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM>;
348}
349
350#[apply(async_trait_maybe_send!)]
351impl IGlobalClientContext for () {
352 fn module_api(&self) -> DynModuleApi {
353 unimplemented!("fake implementation, only for tests");
354 }
355
356 async fn client_config(&self) -> ClientConfig {
357 unimplemented!("fake implementation, only for tests");
358 }
359
360 fn api(&self) -> &DynGlobalApi {
361 unimplemented!("fake implementation, only for tests");
362 }
363
364 fn decoders(&self) -> &ModuleDecoderRegistry {
365 unimplemented!("fake implementation, only for tests");
366 }
367
368 async fn claim_inputs_dyn(
369 &self,
370 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
371 _input: InstancelessDynClientInputBundle,
372 ) -> anyhow::Result<OutPointRange> {
373 unimplemented!("fake implementation, only for tests");
374 }
375
376 async fn fund_output_dyn(
377 &self,
378 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
379 _outputs: InstancelessDynClientOutputBundle,
380 ) -> anyhow::Result<OutPointRange> {
381 unimplemented!("fake implementation, only for tests");
382 }
383
384 async fn add_state_machine_dyn(
385 &self,
386 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
387 _sm: Box<maybe_add_send_sync!(dyn IState)>,
388 ) -> AddStateMachinesResult {
389 unimplemented!("fake implementation, only for tests");
390 }
391
392 async fn log_event_json(
393 &self,
394 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
395 _kind: EventKind,
396 _module: Option<(ModuleKind, ModuleInstanceId)>,
397 _payload: serde_json::Value,
398 _persist: bool,
399 ) {
400 unimplemented!("fake implementation, only for tests");
401 }
402
403 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
404 unimplemented!("fake implementation, only for tests");
405 }
406}
407
408dyn_newtype_define! {
409 #[derive(Clone)]
412 pub DynGlobalClientContext(Arc<IGlobalClientContext>)
413}
414
415impl DynGlobalClientContext {
416 pub fn new_fake() -> Self {
417 DynGlobalClientContext::from(())
418 }
419
420 pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> {
421 self.transaction_update_stream()
422 .await
423 .filter_map(|tx_update| {
424 std::future::ready(match tx_update.state {
425 TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
426 TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
427 Some(Err(submit_error))
428 }
429 _ => None,
430 })
431 })
432 .next_or_pending()
433 .await
434 }
435
436 pub async fn claim_inputs<I, S>(
437 &self,
438 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
439 inputs: ClientInputBundle<I, S>,
440 ) -> anyhow::Result<OutPointRange>
441 where
442 I: IInput + MaybeSend + MaybeSync + 'static,
443 S: IState + MaybeSend + MaybeSync + 'static,
444 {
445 self.claim_inputs_dyn(dbtx, inputs.into_instanceless())
446 .await
447 }
448
449 pub async fn fund_output<O, S>(
458 &self,
459 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
460 outputs: ClientOutputBundle<O, S>,
461 ) -> anyhow::Result<OutPointRange>
462 where
463 O: IOutput + MaybeSend + MaybeSync + 'static,
464 S: IState + MaybeSend + MaybeSync + 'static,
465 {
466 self.fund_output_dyn(dbtx, outputs.into_instanceless())
467 .await
468 }
469
470 pub async fn add_state_machine<S>(
474 &self,
475 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
476 sm: S,
477 ) -> AddStateMachinesResult
478 where
479 S: State + MaybeSend + MaybeSync + 'static,
480 {
481 self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
482 }
483
484 async fn log_event<E>(&self, dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, event: E)
485 where
486 E: Event + Send,
487 {
488 self.log_event_json(
489 dbtx,
490 E::KIND,
491 E::MODULE.map(|m| (m, dbtx.module_id())),
492 serde_json::to_value(&event).expect("Payload serialization can't fail"),
493 <E as Event>::PERSIST,
494 )
495 .await;
496 }
497}
498
499fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>(
500 state_gen: StateGenerator<S>,
501) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> {
502 Arc::new(move |out_point_range| {
503 let states: Vec<S> = state_gen(out_point_range);
504 states
505 .into_iter()
506 .map(|state| box_up_state(state))
507 .collect()
508 })
509}
510
511fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> {
514 Box::new(state)
515}
516
517impl<T> From<Arc<T>> for DynGlobalClientContext
518where
519 T: IGlobalClientContext,
520{
521 fn from(inner: Arc<T>) -> Self {
522 DynGlobalClientContext { inner }
523 }
524}
525
526impl Debug for Client {
528 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
529 write!(f, "Client")
530 }
531}
532
533#[derive(Clone, Debug)]
537struct ModuleGlobalClientContext {
538 client: Arc<Client>,
539 module_instance_id: ModuleInstanceId,
540 operation: OperationId,
541}
542
543#[apply(async_trait_maybe_send!)]
544impl IGlobalClientContext for ModuleGlobalClientContext {
545 fn module_api(&self) -> DynModuleApi {
546 self.api().with_module(self.module_instance_id)
547 }
548
549 fn api(&self) -> &DynGlobalApi {
550 &self.client.api
551 }
552
553 fn decoders(&self) -> &ModuleDecoderRegistry {
554 self.client.decoders()
555 }
556
557 async fn client_config(&self) -> ClientConfig {
558 self.client.config().await
559 }
560
561 async fn claim_inputs_dyn(
562 &self,
563 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
564 inputs: InstancelessDynClientInputBundle,
565 ) -> anyhow::Result<OutPointRange> {
566 let tx_builder =
567 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
568
569 self.client
570 .finalize_and_submit_transaction_inner(
571 &mut dbtx.global_tx().to_ref_nc(),
572 self.operation,
573 tx_builder,
574 )
575 .await
576 }
577
578 async fn fund_output_dyn(
579 &self,
580 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
581 outputs: InstancelessDynClientOutputBundle,
582 ) -> anyhow::Result<OutPointRange> {
583 let tx_builder =
584 TransactionBuilder::new().with_outputs(outputs.into_dyn(self.module_instance_id));
585
586 self.client
587 .finalize_and_submit_transaction_inner(
588 &mut dbtx.global_tx().to_ref_nc(),
589 self.operation,
590 tx_builder,
591 )
592 .await
593 }
594
595 async fn add_state_machine_dyn(
596 &self,
597 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
598 sm: Box<maybe_add_send_sync!(dyn IState)>,
599 ) -> AddStateMachinesResult {
600 let state = DynState::from_parts(self.module_instance_id, sm);
601
602 self.client
603 .executor
604 .add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state])
605 .await
606 }
607
608 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
609 self.client.transaction_update_stream(self.operation).await
610 }
611
612 async fn log_event_json(
613 &self,
614 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
615 kind: EventKind,
616 module: Option<(ModuleKind, ModuleInstanceId)>,
617 payload: serde_json::Value,
618 persist: bool,
619 ) {
620 self.client
621 .log_event_raw_dbtx(
622 dbtx.global_tx(),
623 kind,
624 module,
625 serde_json::to_vec(&payload).expect("Serialization can't fail"),
626 persist,
627 )
628 .await;
629 }
630}
631
632fn states_add_instance(
633 module_instance_id: ModuleInstanceId,
634 state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>,
635) -> StateGenerator<DynState> {
636 Arc::new(move |out_point_range| {
637 let states = state_gen(out_point_range);
638 Iterator::collect(
639 states
640 .into_iter()
641 .map(|state| DynState::from_parts(module_instance_id, state)),
642 )
643 })
644}
645
646#[derive(Debug)]
656pub struct ClientHandle {
657 inner: Option<Arc<Client>>,
658}
659
660pub type ClientHandleArc = Arc<ClientHandle>;
662
663impl ClientHandle {
664 fn new(inner: Arc<Client>) -> Self {
666 ClientHandle {
667 inner: inner.into(),
668 }
669 }
670
671 fn as_inner(&self) -> &Arc<Client> {
672 self.inner.as_ref().expect("Inner always set")
673 }
674
675 pub fn start_executor(&self) {
676 self.as_inner().start_executor();
677 }
678
679 pub async fn shutdown(mut self) {
681 self.shutdown_inner().await;
682 }
683
684 async fn shutdown_inner(&mut self) {
685 let Some(inner) = self.inner.take() else {
686 error!(
687 target: LOG_CLIENT,
688 "ClientHandleShared::shutdown called twice"
689 );
690 return;
691 };
692 inner.executor.stop_executor();
693 let db = inner.db.clone();
694 debug!(target: LOG_CLIENT, "Waiting for client task group to shut down");
695 if let Err(err) = inner
696 .task_group
697 .clone()
698 .shutdown_join_all(Some(Duration::from_secs(30)))
699 .await
700 {
701 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Error waiting for client task group to shut down");
702 }
703
704 let client_strong_count = Arc::strong_count(&inner);
705 debug!(target: LOG_CLIENT, "Dropping last handle to Client");
706 drop(inner);
709
710 if client_strong_count != 1 {
711 debug!(target: LOG_CLIENT, count = client_strong_count - 1, LOG_CLIENT, "External Client references remaining after last handle dropped");
712 }
713
714 let db_strong_count = db.strong_count();
715 if db_strong_count != 1 {
716 debug!(target: LOG_CLIENT, count = db_strong_count - 1, "External DB references remaining after last handle dropped");
717 }
718 trace!(target: LOG_CLIENT, "Dropped last handle to Client");
719 }
720
721 pub async fn restart(self) -> anyhow::Result<ClientHandle> {
729 let (builder, config, api_secret, root_secret) = {
730 let client = self
731 .inner
732 .as_ref()
733 .ok_or_else(|| format_err!("Already stopped"))?;
734 let builder = ClientBuilder::from_existing(client);
735 let config = client.config().await;
736 let api_secret = client.api_secret.clone();
737 let root_secret = client.root_secret.clone();
738
739 (builder, config, api_secret, root_secret)
740 };
741 self.shutdown().await;
742
743 builder.build(root_secret, config, api_secret, false).await
744 }
745}
746
747impl ops::Deref for ClientHandle {
748 type Target = Client;
749
750 fn deref(&self) -> &Self::Target {
751 self.inner.as_ref().expect("Must have inner client set")
752 }
753}
754
755impl Drop for ClientHandle {
761 fn drop(&mut self) {
762 if self.inner.is_none() {
763 return;
764 }
765
766 #[cfg(target_family = "wasm")]
768 let can_block = false;
769 #[cfg(not(target_family = "wasm"))]
770 let can_block = RuntimeHandle::current().runtime_flavor() != RuntimeFlavor::CurrentThread;
772 if !can_block {
773 let inner = self.inner.take().expect("Must have inner client set");
774 inner.executor.stop_executor();
775 if cfg!(target_family = "wasm") {
776 error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on wasm, call ClientHandle::shutdown manually.");
777 } else {
778 error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on current thread runtime, call ClientHandle::shutdown manually.");
779 }
780 return;
781 }
782
783 debug!(target: LOG_CLIENT, "Shutting down the Client on last handle drop");
784 #[cfg(not(target_family = "wasm"))]
785 runtime::block_in_place(|| {
786 runtime::block_on(self.shutdown_inner());
787 });
788 }
789}
790
791const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
795 &[ApiVersion { major: 0, minor: 0 }];
796
797pub type ModuleGlobalContextGen = ContextGen;
798
799pub struct ClientModuleInstance<'m, M: ClientModule> {
801 pub id: ModuleInstanceId,
803 pub db: Database,
805 pub api: DynModuleApi,
807
808 module: &'m M,
809}
810
811impl<'m, M: ClientModule> ClientModuleInstance<'m, M> {
812 pub fn inner(&self) -> &'m M {
814 self.module
815 }
816}
817
818impl<'m, M> ops::Deref for ClientModuleInstance<'m, M>
819where
820 M: ClientModule,
821{
822 type Target = M;
823
824 fn deref(&self) -> &Self::Target {
825 self.module
826 }
827}
828
829pub struct Client {
843 config: RwLock<ClientConfig>,
844 api_secret: Option<String>,
845 decoders: ModuleDecoderRegistry,
846 db: Database,
847 federation_id: FederationId,
848 federation_config_meta: BTreeMap<String, String>,
849 primary_module_instance: ModuleInstanceId,
850 modules: ClientModuleRegistry,
851 module_inits: ClientModuleInitRegistry,
852 executor: Executor,
853 api: DynGlobalApi,
854 root_secret: DerivableSecret,
855 operation_log: OperationLog,
856 secp_ctx: Secp256k1<secp256k1::All>,
857 meta_service: Arc<MetaService>,
858 connector: Connector,
859
860 task_group: TaskGroup,
861
862 client_recovery_progress_receiver:
864 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
865
866 log_ordering_wakeup_tx: watch::Sender<()>,
869 log_event_added_rx: watch::Receiver<()>,
871 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
872}
873
874impl Client {
875 pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
878 apply_migrations_core_client(
879 &db,
880 "fedimint-client".to_string(),
881 get_core_client_database_migrations(),
882 )
883 .await?;
884 Ok(ClientBuilder::new(db))
885 }
886
887 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
888 self.api.as_ref()
889 }
890
891 pub fn api_clone(&self) -> DynGlobalApi {
892 self.api.clone()
893 }
894
895 pub fn task_group(&self) -> &TaskGroup {
897 &self.task_group
898 }
899
900 #[doc(hidden)]
902 pub fn executor(&self) -> &Executor {
903 &self.executor
904 }
905
906 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
907 let mut dbtx = db.begin_transaction_nc().await;
908 dbtx.get_value(&ClientConfigKey).await
909 }
910
911 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
912 let mut dbtx = db.begin_transaction_nc().await;
913 dbtx.get_value(&ApiSecretKey).await
914 }
915
916 pub async fn store_encodable_client_secret<T: Encodable>(
917 db: &Database,
918 secret: T,
919 ) -> anyhow::Result<()> {
920 let mut dbtx = db.begin_transaction().await;
921
922 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
924 bail!("Encoded client secret already exists, cannot overwrite")
925 }
926
927 let encoded_secret = T::consensus_encode_to_vec(&secret);
928 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
929 .await;
930 dbtx.commit_tx().await;
931 Ok(())
932 }
933
934 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
935 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
936 bail!("Encoded client secret not present in DB")
937 };
938
939 Ok(secret)
940 }
941 pub async fn load_decodable_client_secret_opt<T: Decodable>(
942 db: &Database,
943 ) -> anyhow::Result<Option<T>> {
944 let mut dbtx = db.begin_transaction_nc().await;
945
946 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
947
948 Ok(match client_secret {
949 Some(client_secret) => Some(
950 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
951 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
952 ),
953 None => None,
954 })
955 }
956
957 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
958 let client_secret =
959 if let Ok(secret) = Self::load_decodable_client_secret::<[u8; 64]>(db).await {
960 secret
961 } else {
962 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
963 Self::store_encodable_client_secret(db, secret)
964 .await
965 .expect("Storing client secret must work");
966 secret
967 };
968 Ok(client_secret)
969 }
970
971 pub async fn is_initialized(db: &Database) -> bool {
972 Self::get_config_from_db(db).await.is_some()
973 }
974
975 pub fn start_executor(self: &Arc<Self>) {
976 debug!(
977 target: LOG_CLIENT,
978 "Starting fedimint client executor (version: {})",
979 fedimint_build_code_version_env!()
980 );
981 self.executor.start_executor(self.context_gen());
982 }
983
984 pub fn federation_id(&self) -> FederationId {
985 self.federation_id
986 }
987
988 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
989 let client_inner = Arc::downgrade(self);
990 Arc::new(move |module_instance, operation| {
991 ModuleGlobalClientContext {
992 client: client_inner
993 .clone()
994 .upgrade()
995 .expect("ModuleGlobalContextGen called after client was dropped"),
996 module_instance_id: module_instance,
997 operation,
998 }
999 .into()
1000 })
1001 }
1002
1003 pub async fn config(&self) -> ClientConfig {
1004 self.config.read().await.clone()
1005 }
1006
1007 pub fn api_secret(&self) -> &Option<String> {
1008 &self.api_secret
1009 }
1010
1011 pub fn decoders(&self) -> &ModuleDecoderRegistry {
1012 &self.decoders
1013 }
1014
1015 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1017 self.try_get_module(instance)
1018 .expect("Module instance not found")
1019 }
1020
1021 fn try_get_module(
1022 &self,
1023 instance: ModuleInstanceId,
1024 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
1025 Some(self.modules.get(instance)?.as_ref())
1026 }
1027
1028 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
1029 self.modules.get(instance).is_some()
1030 }
1031
1032 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
1038 let mut in_amount = Amount::ZERO;
1040 let mut out_amount = Amount::ZERO;
1041 let mut fee_amount = Amount::ZERO;
1042
1043 for input in builder.inputs() {
1044 let module = self.get_module(input.input.module_instance_id());
1045
1046 let item_fee = module.input_fee(input.amount, &input.input).expect(
1047 "We only build transactions with input versions that are supported by the module",
1048 );
1049
1050 in_amount += input.amount;
1051 fee_amount += item_fee;
1052 }
1053
1054 for output in builder.outputs() {
1055 let module = self.get_module(output.output.module_instance_id());
1056
1057 let item_fee = module.output_fee(output.amount, &output.output).expect(
1058 "We only build transactions with output versions that are supported by the module",
1059 );
1060
1061 out_amount += output.amount;
1062 fee_amount += item_fee;
1063 }
1064
1065 (in_amount, out_amount + fee_amount)
1066 }
1067
1068 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1069 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
1070 }
1071
1072 pub fn get_config_meta(&self, key: &str) -> Option<String> {
1074 self.federation_config_meta.get(key).cloned()
1075 }
1076
1077 fn root_secret(&self) -> DerivableSecret {
1078 self.root_secret.clone()
1079 }
1080
1081 pub async fn add_state_machines(
1082 &self,
1083 dbtx: &mut DatabaseTransaction<'_>,
1084 states: Vec<DynState>,
1085 ) -> AddStateMachinesResult {
1086 self.executor.add_state_machines_dbtx(dbtx, states).await
1087 }
1088
1089 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
1091 let active_states = self.executor.get_active_states().await;
1092 let mut active_operations = HashSet::with_capacity(active_states.len());
1093 let mut dbtx = self.db().begin_transaction_nc().await;
1094 for (state, _) in active_states {
1095 let operation_id = state.operation_id();
1096 if dbtx
1097 .get_value(&OperationLogKey { operation_id })
1098 .await
1099 .is_some()
1100 {
1101 active_operations.insert(operation_id);
1102 }
1103 }
1104 active_operations
1105 }
1106
1107 pub fn operation_log(&self) -> &OperationLog {
1108 &self.operation_log
1109 }
1110
1111 pub fn meta_service(&self) -> &Arc<MetaService> {
1113 &self.meta_service
1114 }
1115
1116 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
1118 let meta_service = self.meta_service();
1119 let ts = meta_service
1120 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
1121 .await
1122 .and_then(|v| v.value)?;
1123 Some(UNIX_EPOCH + Duration::from_secs(ts))
1124 }
1125
1126 async fn finalize_transaction(
1128 &self,
1129 dbtx: &mut DatabaseTransaction<'_>,
1130 operation_id: OperationId,
1131 mut partial_transaction: TransactionBuilder,
1132 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
1133 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1134
1135 let (added_input_bundle, change_outputs) = self
1136 .primary_module()
1137 .create_final_inputs_and_outputs(
1138 self.primary_module_instance,
1139 dbtx,
1140 operation_id,
1141 input_amount,
1142 output_amount,
1143 )
1144 .await?;
1145
1146 let change_range = Range {
1150 start: partial_transaction.outputs().count() as u64,
1151 end: (partial_transaction.outputs().count() + change_outputs.outputs.len()) as u64,
1152 };
1153
1154 partial_transaction = partial_transaction
1155 .with_inputs(added_input_bundle)
1156 .with_outputs(change_outputs);
1157
1158 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1159
1160 assert!(input_amount >= output_amount, "Transaction is underfunded");
1161
1162 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
1163
1164 Ok((tx, states, change_range))
1165 }
1166
1167 pub async fn finalize_and_submit_transaction<F, M>(
1179 &self,
1180 operation_id: OperationId,
1181 operation_type: &str,
1182 operation_meta_gen: F,
1183 tx_builder: TransactionBuilder,
1184 ) -> anyhow::Result<OutPointRange>
1185 where
1186 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
1187 M: serde::Serialize + MaybeSend,
1188 {
1189 let operation_type = operation_type.to_owned();
1190
1191 let autocommit_res = self
1192 .db
1193 .autocommit(
1194 |dbtx, _| {
1195 let operation_type = operation_type.clone();
1196 let tx_builder = tx_builder.clone();
1197 let operation_meta_gen = operation_meta_gen.clone();
1198 Box::pin(async move {
1199 if Client::operation_exists_dbtx(dbtx, operation_id).await {
1200 bail!("There already exists an operation with id {operation_id:?}")
1201 }
1202
1203 let out_point_range = self
1204 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
1205 .await?;
1206
1207 self.operation_log()
1208 .add_operation_log_entry(
1209 dbtx,
1210 operation_id,
1211 &operation_type,
1212 operation_meta_gen(out_point_range),
1213 )
1214 .await;
1215
1216 Ok(out_point_range)
1217 })
1218 },
1219 Some(100), )
1221 .await;
1222
1223 match autocommit_res {
1224 Ok(txid) => Ok(txid),
1225 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
1226 Err(AutocommitError::CommitFailed {
1227 attempts,
1228 last_error,
1229 }) => panic!(
1230 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
1231 ),
1232 }
1233 }
1234
1235 async fn finalize_and_submit_transaction_inner(
1236 &self,
1237 dbtx: &mut DatabaseTransaction<'_>,
1238 operation_id: OperationId,
1239 tx_builder: TransactionBuilder,
1240 ) -> anyhow::Result<OutPointRange> {
1241 let (transaction, mut states, change_range) = self
1242 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
1243 .await?;
1244
1245 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
1246 let inputs = transaction
1247 .inputs
1248 .iter()
1249 .map(DynInput::module_instance_id)
1250 .collect::<Vec<_>>();
1251 let outputs = transaction
1252 .outputs
1253 .iter()
1254 .map(DynOutput::module_instance_id)
1255 .collect::<Vec<_>>();
1256 warn!(
1257 target: LOG_CLIENT_NET_API,
1258 size=%transaction.consensus_encode_to_vec().len(),
1259 ?inputs,
1260 ?outputs,
1261 "Transaction too large",
1262 );
1263 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
1264 bail!(
1265 "The generated transaction would be rejected by the federation for being too large."
1266 );
1267 }
1268
1269 let txid = transaction.tx_hash();
1270
1271 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
1272
1273 let tx_submission_sm = DynState::from_typed(
1274 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1275 TxSubmissionStatesSM {
1276 operation_id,
1277 state: TxSubmissionStates::Created(transaction),
1278 },
1279 );
1280 states.push(tx_submission_sm);
1281
1282 self.executor.add_state_machines_dbtx(dbtx, states).await?;
1283
1284 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
1285 .await;
1286
1287 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
1288 }
1289
1290 async fn transaction_update_stream(
1291 &self,
1292 operation_id: OperationId,
1293 ) -> BoxStream<'static, TxSubmissionStatesSM> {
1294 self.executor
1295 .notifier()
1296 .module_notifier::<TxSubmissionStatesSM>(TRANSACTION_SUBMISSION_MODULE_INSTANCE)
1297 .subscribe(operation_id)
1298 .await
1299 }
1300
1301 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
1302 let mut dbtx = self.db().begin_transaction_nc().await;
1303
1304 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
1305 }
1306
1307 pub async fn operation_exists_dbtx(
1308 dbtx: &mut DatabaseTransaction<'_>,
1309 operation_id: OperationId,
1310 ) -> bool {
1311 let active_state_exists = dbtx
1312 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1313 .await
1314 .next()
1315 .await
1316 .is_some();
1317
1318 let inactive_state_exists = dbtx
1319 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
1320 .await
1321 .next()
1322 .await
1323 .is_some();
1324
1325 active_state_exists || inactive_state_exists
1326 }
1327
1328 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
1329 self.db
1330 .begin_transaction_nc()
1331 .await
1332 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1333 .await
1334 .next()
1335 .await
1336 .is_some()
1337 }
1338
1339 pub async fn await_primary_module_output(
1342 &self,
1343 operation_id: OperationId,
1344 out_point: OutPoint,
1345 ) -> anyhow::Result<()> {
1346 self.primary_module()
1347 .await_primary_module_output(operation_id, out_point)
1348 .await
1349 }
1350
1351 pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
1353 let module_kind = M::kind();
1354 let id = self
1355 .get_first_instance(&module_kind)
1356 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
1357 let module: &M = self
1358 .try_get_module(id)
1359 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
1360 .as_any()
1361 .downcast_ref::<M>()
1362 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
1363 let (db, _) = self.db().with_prefix_module_id(id);
1364 Ok(ClientModuleInstance {
1365 id,
1366 db,
1367 api: self.api().with_module(id),
1368 module,
1369 })
1370 }
1371
1372 pub fn get_module_client_dyn(
1373 &self,
1374 instance_id: ModuleInstanceId,
1375 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
1376 self.try_get_module(instance_id)
1377 .ok_or(anyhow!("Unknown module instance {}", instance_id))
1378 }
1379
1380 pub fn db(&self) -> &Database {
1381 &self.db
1382 }
1383
1384 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1387 TransactionUpdates {
1388 update_stream: self.transaction_update_stream(operation_id).await,
1389 }
1390 }
1391
1392 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1396 if self
1397 .modules
1398 .get_with_kind(self.primary_module_instance)
1399 .is_some_and(|(kind, _)| kind == module_kind)
1400 {
1401 return Some(self.primary_module_instance);
1402 }
1403
1404 self.modules
1405 .iter_modules()
1406 .find(|(_, kind, _module)| *kind == module_kind)
1407 .map(|(instance_id, _, _)| instance_id)
1408 }
1409
1410 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1413 get_decoded_client_secret::<T>(self.db()).await
1414 }
1415
1416 pub async fn await_primary_module_outputs(
1419 &self,
1420 operation_id: OperationId,
1421 outputs: Vec<OutPoint>,
1422 ) -> anyhow::Result<()> {
1423 for out_point in outputs {
1424 self.await_primary_module_output(operation_id, out_point)
1425 .await?;
1426 }
1427
1428 Ok(())
1429 }
1430
1431 pub async fn get_config_json(&self) -> JsonClientConfig {
1437 self.config().await.to_json()
1438 }
1439
1440 pub fn primary_module(&self) -> &DynClientModule {
1442 self.modules
1443 .get(self.primary_module_instance)
1444 .expect("primary module must be present")
1445 }
1446
1447 pub async fn get_balance(&self) -> Amount {
1449 self.primary_module()
1450 .get_balance(
1451 self.primary_module_instance,
1452 &mut self.db().begin_transaction_nc().await,
1453 )
1454 .await
1455 }
1456
1457 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
1460 let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
1461 let initial_balance = self.get_balance().await;
1462 let db = self.db().clone();
1463 let primary_module = self.primary_module().clone();
1464 let primary_module_instance = self.primary_module_instance;
1465
1466 Box::pin(stream! {
1467 yield initial_balance;
1468 let mut prev_balance = initial_balance;
1469 while let Some(()) = balance_changes.next().await {
1470 let mut dbtx = db.begin_transaction_nc().await;
1471 let balance = primary_module
1472 .get_balance(primary_module_instance, &mut dbtx)
1473 .await;
1474
1475 if balance != prev_balance {
1477 prev_balance = balance;
1478 yield balance;
1479 }
1480 }
1481 })
1482 }
1483
1484 pub async fn refresh_peers_api_versions(
1487 num_peers: NumPeers,
1488 api: DynGlobalApi,
1489 db: Database,
1490 num_responses_sender: watch::Sender<usize>,
1491 ) {
1492 async fn make_request(
1497 delay: Duration,
1498 peer_id: PeerId,
1499 api: &DynGlobalApi,
1500 ) -> (
1501 PeerId,
1502 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
1503 ) {
1504 runtime::sleep(delay).await;
1505 (
1506 peer_id,
1507 api.request_single_peer::<SupportedApiVersionsSummary>(
1508 VERSION_ENDPOINT.to_owned(),
1509 ApiRequestErased::default(),
1510 peer_id,
1511 )
1512 .await,
1513 )
1514 }
1515
1516 let mut requests = FuturesUnordered::new();
1519
1520 for peer_id in num_peers.peer_ids() {
1521 requests.push(make_request(Duration::ZERO, peer_id, &api));
1522 }
1523
1524 let mut num_responses = 0;
1525
1526 while let Some((peer_id, response)) = requests.next().await {
1527 match response {
1528 Err(err) => {
1529 if db
1530 .begin_transaction_nc()
1531 .await
1532 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1533 .await
1534 .is_some()
1535 {
1536 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
1537 } else {
1538 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
1539 requests.push(make_request(Duration::from_secs(15), peer_id, &api));
1540 }
1541 }
1542 Ok(o) => {
1543 let mut dbtx = db.begin_transaction().await;
1546 dbtx.insert_entry(
1547 &PeerLastApiVersionsSummaryKey(peer_id),
1548 &PeerLastApiVersionsSummary(o),
1549 )
1550 .await;
1551 dbtx.commit_tx().await;
1552 num_responses += 1;
1553 let _ = num_responses_sender.send(num_responses);
1555 }
1556 }
1557 }
1558 }
1559
1560 pub fn supported_api_versions_summary_static(
1562 config: &ClientConfig,
1563 client_module_init: &ClientModuleInitRegistry,
1564 ) -> SupportedApiVersionsSummary {
1565 SupportedApiVersionsSummary {
1566 core: SupportedCoreApiVersions {
1567 core_consensus: config.global.consensus_version,
1568 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1569 .expect("must not have conflicting versions"),
1570 },
1571 modules: config
1572 .modules
1573 .iter()
1574 .filter_map(|(&module_instance_id, module_config)| {
1575 client_module_init
1576 .get(module_config.kind())
1577 .map(|module_init| {
1578 (
1579 module_instance_id,
1580 SupportedModuleApiVersions {
1581 core_consensus: config.global.consensus_version,
1582 module_consensus: module_config.version,
1583 api: module_init.supported_api_versions(),
1584 },
1585 )
1586 })
1587 })
1588 .collect(),
1589 }
1590 }
1591
1592 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1593 Self::load_and_refresh_common_api_version_static(
1594 &self.config().await,
1595 &self.module_inits,
1596 &self.api,
1597 &self.db,
1598 &self.task_group,
1599 )
1600 .await
1601 }
1602
1603 async fn load_and_refresh_common_api_version_static(
1609 config: &ClientConfig,
1610 module_init: &ClientModuleInitRegistry,
1611 api: &DynGlobalApi,
1612 db: &Database,
1613 task_group: &TaskGroup,
1614 ) -> anyhow::Result<ApiVersionSet> {
1615 if let Some(v) = db
1616 .begin_transaction_nc()
1617 .await
1618 .get_value(&CachedApiVersionSetKey)
1619 .await
1620 {
1621 debug!(
1622 target: LOG_CLIENT,
1623 "Found existing cached common api versions"
1624 );
1625 let config = config.clone();
1626 let client_module_init = module_init.clone();
1627 let api = api.clone();
1628 let db = db.clone();
1629 let task_group = task_group.clone();
1630 task_group
1633 .clone()
1634 .spawn_cancellable("refresh_common_api_version_static", async move {
1635 if let Err(error) = Self::refresh_common_api_version_static(
1636 &config,
1637 &client_module_init,
1638 &api,
1639 &db,
1640 task_group,
1641 )
1642 .await
1643 {
1644 warn!(
1645 target: LOG_CLIENT,
1646 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1647 );
1648 }
1649 });
1650
1651 return Ok(v.0);
1652 }
1653
1654 debug!(
1655 target: LOG_CLIENT,
1656 "No existing cached common api versions found, waiting for initial discovery"
1657 );
1658 Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
1659 .await
1660 }
1661
1662 async fn refresh_common_api_version_static(
1663 config: &ClientConfig,
1664 client_module_init: &ClientModuleInitRegistry,
1665 api: &DynGlobalApi,
1666 db: &Database,
1667 task_group: TaskGroup,
1668 ) -> anyhow::Result<ApiVersionSet> {
1669 debug!(
1670 target: LOG_CLIENT,
1671 "Refreshing common api versions"
1672 );
1673
1674 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1675 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1676
1677 task_group.spawn_cancellable("refresh peers api versions", {
1678 Client::refresh_peers_api_versions(
1679 num_peers,
1680 api.clone(),
1681 db.clone(),
1682 num_responses_sender,
1683 )
1684 });
1685
1686 let _: Result<_, Elapsed> = runtime::timeout(
1694 Duration::from_secs(15),
1695 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1696 )
1697 .await;
1698
1699 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1700
1701 let common_api_versions = discover_common_api_versions_set(
1702 &Self::supported_api_versions_summary_static(config, client_module_init),
1703 &peer_api_version_sets,
1704 )?;
1705
1706 debug!(
1707 target: LOG_CLIENT,
1708 value = ?common_api_versions,
1709 "Updating the cached common api versions"
1710 );
1711 let mut dbtx = db.begin_transaction().await;
1712 let _ = dbtx
1713 .insert_entry(
1714 &CachedApiVersionSetKey,
1715 &CachedApiVersionSet(common_api_versions.clone()),
1716 )
1717 .await;
1718
1719 dbtx.commit_tx().await;
1720
1721 Ok(common_api_versions)
1722 }
1723
1724 pub async fn get_metadata(&self) -> Metadata {
1726 self.db
1727 .begin_transaction_nc()
1728 .await
1729 .get_value(&ClientMetadataKey)
1730 .await
1731 .unwrap_or_else(|| {
1732 warn!(
1733 target: LOG_CLIENT,
1734 "Missing existing metadata. This key should have been set on Client init"
1735 );
1736 Metadata::empty()
1737 })
1738 }
1739
1740 pub async fn set_metadata(&self, metadata: &Metadata) {
1742 self.db
1743 .autocommit::<_, _, anyhow::Error>(
1744 |dbtx, _| {
1745 Box::pin(async {
1746 Self::set_metadata_dbtx(dbtx, metadata).await;
1747 Ok(())
1748 })
1749 },
1750 None,
1751 )
1752 .await
1753 .expect("Failed to autocommit metadata");
1754 }
1755
1756 pub fn has_pending_recoveries(&self) -> bool {
1757 !self
1758 .client_recovery_progress_receiver
1759 .borrow()
1760 .iter()
1761 .all(|(_id, progress)| progress.is_done())
1762 }
1763
1764 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1772 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1773 recovery_receiver
1774 .wait_for(|in_progress| {
1775 in_progress
1776 .iter()
1777 .all(|(_id, progress)| progress.is_done())
1778 })
1779 .await
1780 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1781
1782 Ok(())
1783 }
1784
1785 pub fn subscribe_to_recovery_progress(
1790 &self,
1791 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> {
1792 WatchStream::new(self.client_recovery_progress_receiver.clone())
1793 .flat_map(futures::stream::iter)
1794 }
1795
1796 pub async fn wait_for_module_kind_recovery(
1797 &self,
1798 module_kind: ModuleKind,
1799 ) -> anyhow::Result<()> {
1800 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1801 let config = self.config().await;
1802 recovery_receiver
1803 .wait_for(|in_progress| {
1804 !in_progress
1805 .iter()
1806 .filter(|(module_instance_id, _progress)| {
1807 config.modules[module_instance_id].kind == module_kind
1808 })
1809 .any(|(_id, progress)| !progress.is_done())
1810 })
1811 .await
1812 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1813
1814 Ok(())
1815 }
1816
1817 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1818 loop {
1819 if self.executor.get_active_states().await.is_empty() {
1820 break;
1821 }
1822 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1823 }
1824 Ok(())
1825 }
1826
1827 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1829 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1830 }
1831
1832 fn spawn_module_recoveries_task(
1833 &self,
1834 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1835 module_recoveries: BTreeMap<
1836 ModuleInstanceId,
1837 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1838 >,
1839 module_recovery_progress_receivers: BTreeMap<
1840 ModuleInstanceId,
1841 watch::Receiver<RecoveryProgress>,
1842 >,
1843 ) {
1844 let db = self.db.clone();
1845 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1846 self.task_group
1847 .spawn("module recoveries", |_task_handle| async {
1848 Self::run_module_recoveries_task(
1849 db,
1850 log_ordering_wakeup_tx,
1851 recovery_sender,
1852 module_recoveries,
1853 module_recovery_progress_receivers,
1854 )
1855 .await;
1856 });
1857 }
1858
1859 async fn run_module_recoveries_task(
1860 db: Database,
1861 log_ordering_wakeup_tx: watch::Sender<()>,
1862 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1863 module_recoveries: BTreeMap<
1864 ModuleInstanceId,
1865 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1866 >,
1867 module_recovery_progress_receivers: BTreeMap<
1868 ModuleInstanceId,
1869 watch::Receiver<RecoveryProgress>,
1870 >,
1871 ) {
1872 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1873 let mut completed_stream = Vec::new();
1874 let progress_stream = futures::stream::FuturesUnordered::new();
1875
1876 for (module_instance_id, f) in module_recoveries {
1877 completed_stream.push(futures::stream::once(Box::pin(async move {
1878 match f.await {
1879 Ok(()) => (module_instance_id, None),
1880 Err(err) => {
1881 warn!(
1882 target: LOG_CLIENT,
1883 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1884 );
1885 futures::future::pending::<()>().await;
1889 unreachable!()
1890 }
1891 }
1892 })));
1893 }
1894
1895 for (module_instance_id, rx) in module_recovery_progress_receivers {
1896 progress_stream.push(
1897 tokio_stream::wrappers::WatchStream::new(rx)
1898 .fuse()
1899 .map(move |progress| (module_instance_id, Some(progress))),
1900 );
1901 }
1902
1903 let mut futures = futures::stream::select(
1904 futures::stream::select_all(progress_stream),
1905 futures::stream::select_all(completed_stream),
1906 );
1907
1908 while let Some((module_instance_id, progress)) = futures.next().await {
1909 let mut dbtx = db.begin_transaction().await;
1910
1911 let prev_progress = *recovery_sender
1912 .borrow()
1913 .get(&module_instance_id)
1914 .expect("existing progress must be present");
1915
1916 let progress = if prev_progress.is_done() {
1917 prev_progress
1919 } else if let Some(progress) = progress {
1920 progress
1921 } else {
1922 prev_progress.to_complete()
1923 };
1924
1925 if !prev_progress.is_done() && progress.is_done() {
1926 info!(
1927 target: LOG_CLIENT,
1928 module_instance_id,
1929 progress = format!("{}/{}", progress.complete, progress.total),
1930 "Recovery complete"
1931 );
1932 dbtx.log_event(
1933 log_ordering_wakeup_tx.clone(),
1934 None,
1935 ModuleRecoveryCompleted {
1936 module_id: module_instance_id,
1937 },
1938 )
1939 .await;
1940 } else {
1941 info!(
1942 target: LOG_CLIENT,
1943 module_instance_id,
1944 progress = format!("{}/{}", progress.complete, progress.total),
1945 "Recovery progress"
1946 );
1947 }
1948
1949 dbtx.insert_entry(
1950 &ClientModuleRecovery { module_instance_id },
1951 &ClientModuleRecoveryState { progress },
1952 )
1953 .await;
1954 dbtx.commit_tx().await;
1955
1956 recovery_sender.send_modify(|v| {
1957 v.insert(module_instance_id, progress);
1958 });
1959 }
1960 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1961 }
1962
1963 async fn load_peers_last_api_versions(
1964 db: &Database,
1965 num_peers: NumPeers,
1966 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1967 let mut peer_api_version_sets = BTreeMap::new();
1968
1969 let mut dbtx = db.begin_transaction_nc().await;
1970 for peer_id in num_peers.peer_ids() {
1971 if let Some(v) = dbtx
1972 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1973 .await
1974 {
1975 peer_api_version_sets.insert(peer_id, v.0);
1976 }
1977 }
1978 drop(dbtx);
1979 peer_api_version_sets
1980 }
1981
1982 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1985 self.db()
1986 .begin_transaction_nc()
1987 .await
1988 .find_by_prefix(&ApiAnnouncementPrefix)
1989 .await
1990 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1991 .collect()
1992 .await
1993 }
1994
1995 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1997 get_api_urls(&self.db, &self.config().await).await
1998 }
1999
2000 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2003 self.get_peer_urls()
2004 .await
2005 .into_iter()
2006 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
2007 .map(|peer_url| {
2008 InviteCode::new(
2009 peer_url.clone(),
2010 peer,
2011 self.federation_id(),
2012 self.api_secret.clone(),
2013 )
2014 })
2015 }
2016
2017 pub async fn get_guardian_public_keys_blocking(
2021 &self,
2022 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
2023 self.db.autocommit(|dbtx, _| Box::pin(async move {
2024 let config = self.config().await;
2025
2026 let guardian_pub_keys = if let Some(guardian_pub_keys) = config.global.broadcast_public_keys {guardian_pub_keys}else{
2027 let fetched_config = retry(
2028 "Fetching guardian public keys",
2029 backoff_util::background_backoff(),
2030 || async {
2031 Ok(self.api.request_current_consensus::<ClientConfig>(
2032 CLIENT_CONFIG_ENDPOINT.to_owned(),
2033 ApiRequestErased::default(),
2034 ).await?)
2035 },
2036 )
2037 .await
2038 .expect("Will never return on error");
2039
2040 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
2041 warn!(
2042 target: LOG_CLIENT,
2043 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
2044 );
2045 pending::<()>().await;
2046 unreachable!("Pending will never return");
2047 };
2048
2049 let new_config = ClientConfig {
2050 global: GlobalClientConfig {
2051 broadcast_public_keys: Some(guardian_pub_keys.clone()),
2052 ..config.global
2053 },
2054 modules: config.modules,
2055 };
2056
2057 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
2058 *(self.config.write().await) = new_config;
2059 guardian_pub_keys
2060 };
2061
2062 Result::<_, ()>::Ok(guardian_pub_keys)
2063 }), None).await.expect("Will retry forever")
2064 }
2065
2066 pub fn handle_global_rpc(
2067 &self,
2068 method: String,
2069 params: serde_json::Value,
2070 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
2071 Box::pin(try_stream! {
2072 match method.as_str() {
2073 "get_balance" => {
2074 let balance = self.get_balance().await;
2075 yield serde_json::to_value(balance)?;
2076 }
2077 "subscribe_balance_changes" => {
2078 let mut stream = self.subscribe_balance_changes().await;
2079 while let Some(balance) = stream.next().await {
2080 yield serde_json::to_value(balance)?;
2081 }
2082 }
2083 "get_config" => {
2084 let config = self.config().await;
2085 yield serde_json::to_value(config)?;
2086 }
2087 "get_federation_id" => {
2088 let federation_id = self.federation_id();
2089 yield serde_json::to_value(federation_id)?;
2090 }
2091 "get_invite_code" => {
2092 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
2093 let invite_code = self.invite_code(req.peer).await;
2094 yield serde_json::to_value(invite_code)?;
2095 }
2096 "list_operations" => {
2097 let operations = self.operation_log().paginate_operations_rev(usize::MAX, None).await;
2099 yield serde_json::to_value(operations)?;
2100 }
2101 "has_pending_recoveries" => {
2102 let has_pending = self.has_pending_recoveries();
2103 yield serde_json::to_value(has_pending)?;
2104 }
2105 "wait_for_all_recoveries" => {
2106 self.wait_for_all_recoveries().await?;
2107 yield serde_json::Value::Null;
2108 }
2109 "subscribe_to_recovery_progress" => {
2110 let mut stream = self.subscribe_to_recovery_progress();
2111 while let Some((module_id, progress)) = stream.next().await {
2112 yield serde_json::json!({
2113 "module_id": module_id,
2114 "progress": progress
2115 });
2116 }
2117 }
2118 _ => {
2119 Err(anyhow::format_err!("Unknown method: {}", method))?;
2120 unreachable!()
2121 },
2122 }
2123 })
2124 }
2125
2126 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
2127 where
2128 E: Event + Send,
2129 {
2130 let mut dbtx = self.db.begin_transaction().await;
2131 self.log_event_dbtx(&mut dbtx, module_id, event).await;
2132 dbtx.commit_tx().await;
2133 }
2134
2135 pub async fn log_event_dbtx<E, Cap>(
2136 &self,
2137 dbtx: &mut DatabaseTransaction<'_, Cap>,
2138 module_id: Option<ModuleInstanceId>,
2139 event: E,
2140 ) where
2141 E: Event + Send,
2142 Cap: Send,
2143 {
2144 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
2145 .await;
2146 }
2147
2148 pub async fn log_event_raw_dbtx<Cap>(
2149 &self,
2150 dbtx: &mut DatabaseTransaction<'_, Cap>,
2151 kind: EventKind,
2152 module: Option<(ModuleKind, ModuleInstanceId)>,
2153 payload: Vec<u8>,
2154 persist: bool,
2155 ) where
2156 Cap: Send,
2157 {
2158 let module_id = module.as_ref().map(|m| m.1);
2159 let module_kind = module.map(|m| m.0);
2160 dbtx.log_event_raw(
2161 self.log_ordering_wakeup_tx.clone(),
2162 kind,
2163 module_kind,
2164 module_id,
2165 payload,
2166 persist,
2167 )
2168 .await;
2169 }
2170
2171 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
2172 where
2173 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
2174 K: DatabaseRecord<Value = EventLogId>,
2175 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2176 R: Future<Output = anyhow::Result<()>>,
2177 {
2178 fedimint_eventlog::handle_events(
2179 self.db.clone(),
2180 pos_key,
2181 self.log_event_added_rx.clone(),
2182 call_fn,
2183 )
2184 .await
2185 }
2186
2187 pub async fn get_event_log(
2188 &self,
2189 pos: Option<EventLogId>,
2190 limit: u64,
2191 ) -> Vec<(
2192 EventLogId,
2193 EventKind,
2194 Option<(ModuleKind, ModuleInstanceId)>,
2195 u64,
2196 serde_json::Value,
2197 )> {
2198 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2199 .await
2200 }
2201
2202 pub async fn get_event_log_dbtx<Cap>(
2203 &self,
2204 dbtx: &mut DatabaseTransaction<'_, Cap>,
2205 pos: Option<EventLogId>,
2206 limit: u64,
2207 ) -> Vec<(
2208 EventLogId,
2209 EventKind,
2210 Option<(ModuleKind, ModuleInstanceId)>,
2211 u64,
2212 serde_json::Value,
2213 )>
2214 where
2215 Cap: Send,
2216 {
2217 dbtx.get_event_log(pos, limit).await
2218 }
2219
2220 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2222 self.log_event_added_transient_tx.subscribe()
2223 }
2224}
2225
2226#[apply(async_trait_maybe_send!)]
2227impl ClientContextIface for Client {
2228 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2229 Client::get_module(self, instance)
2230 }
2231
2232 fn api_clone(&self) -> DynGlobalApi {
2233 Client::api_clone(self)
2234 }
2235 fn decoders(&self) -> &ModuleDecoderRegistry {
2236 Client::decoders(self)
2237 }
2238
2239 async fn finalize_and_submit_transaction(
2240 &self,
2241 operation_id: OperationId,
2242 operation_type: &str,
2243 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2244 tx_builder: TransactionBuilder,
2245 ) -> anyhow::Result<OutPointRange> {
2246 Client::finalize_and_submit_transaction(
2247 self,
2248 operation_id,
2249 operation_type,
2250 &operation_meta_gen,
2252 tx_builder,
2253 )
2254 .await
2255 }
2256
2257 async fn finalize_and_submit_transaction_inner(
2258 &self,
2259 dbtx: &mut DatabaseTransaction<'_>,
2260 operation_id: OperationId,
2261 tx_builder: TransactionBuilder,
2262 ) -> anyhow::Result<OutPointRange> {
2263 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2264 }
2265
2266 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2267 Client::transaction_updates(self, operation_id).await
2268 }
2269
2270 async fn await_primary_module_outputs(
2271 &self,
2272 operation_id: OperationId,
2273 outputs: Vec<OutPoint>,
2275 ) -> anyhow::Result<()> {
2276 Client::await_primary_module_outputs(self, operation_id, outputs).await
2277 }
2278
2279 fn operation_log(&self) -> &OperationLog {
2280 Client::operation_log(self)
2281 }
2282
2283 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2284 Client::has_active_states(self, operation_id).await
2285 }
2286
2287 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2288 Client::operation_exists(self, operation_id).await
2289 }
2290
2291 async fn config(&self) -> ClientConfig {
2292 Client::config(self).await
2293 }
2294
2295 fn db(&self) -> &Database {
2296 Client::db(self)
2297 }
2298
2299 fn executor(&self) -> &Executor {
2300 Client::executor(self)
2301 }
2302
2303 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2304 Client::invite_code(self, peer).await
2305 }
2306
2307 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2308 Client::get_internal_payment_markers(self)
2309 }
2310
2311 async fn log_event_json(
2312 &self,
2313 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2314 module_kind: Option<ModuleKind>,
2315 module_id: ModuleInstanceId,
2316 kind: EventKind,
2317 payload: serde_json::Value,
2318 persist: bool,
2319 ) {
2320 dbtx.ensure_global()
2321 .expect("Must be called with global dbtx");
2322 self.log_event_raw_dbtx(
2323 dbtx,
2324 kind,
2325 module_kind.map(|kind| (kind, module_id)),
2326 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2327 persist,
2328 )
2329 .await;
2330 }
2331}
2332#[derive(Deserialize)]
2333struct GetInviteCodeRequest {
2334 peer: PeerId,
2335}
2336
2337pub struct TransactionUpdates {
2339 update_stream: BoxStream<'static, TxSubmissionStatesSM>,
2340}
2341
2342impl TransactionUpdates {
2343 pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
2346 debug!(target: LOG_CLIENT, %await_txid, "Await tx accepted");
2347 self.update_stream
2348 .filter_map(|tx_update| {
2349 std::future::ready(match tx_update.state {
2350 TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
2351 TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
2352 Some(Err(submit_error))
2353 }
2354 _ => None,
2355 })
2356 })
2357 .next_or_pending()
2358 .await?;
2359 debug!(target: LOG_CLIENT, %await_txid, "Tx accepted");
2360 Ok(())
2361 }
2362}
2363
2364pub struct AdminCreds {
2366 pub peer_id: PeerId,
2368 pub auth: ApiAuth,
2370}
2371
2372pub struct ClientBuilder {
2374 module_inits: ClientModuleInitRegistry,
2375 primary_module_instance: Option<ModuleInstanceId>,
2376 primary_module_kind: Option<ModuleKind>,
2377 admin_creds: Option<AdminCreds>,
2378 db_no_decoders: Database,
2379 meta_service: Arc<MetaService>,
2380 connector: Connector,
2381 stopped: bool,
2382 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2383}
2384
2385impl ClientBuilder {
2386 fn new(db: Database) -> Self {
2387 let meta_service = MetaService::new(LegacyMetaSource::default());
2388 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
2389 broadcast::channel(1024);
2390 ClientBuilder {
2391 module_inits: ModuleInitRegistry::new(),
2392 primary_module_instance: None,
2393 primary_module_kind: None,
2394 connector: Connector::default(),
2395 admin_creds: None,
2396 db_no_decoders: db,
2397 stopped: false,
2398 meta_service,
2399 log_event_added_transient_tx,
2400 }
2401 }
2402
2403 fn from_existing(client: &Client) -> Self {
2404 ClientBuilder {
2405 module_inits: client.module_inits.clone(),
2406 primary_module_instance: Some(client.primary_module_instance),
2407 primary_module_kind: None,
2408 admin_creds: None,
2409 db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
2410 stopped: false,
2411 meta_service: client.meta_service.clone(),
2413 connector: client.connector,
2414 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
2415 }
2416 }
2417
2418 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
2420 self.module_inits = module_inits;
2421 }
2422
2423 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
2425 self.module_inits.attach(module_init);
2426 }
2427
2428 pub fn stopped(&mut self) {
2429 self.stopped = true;
2430 }
2431
2432 #[deprecated(
2438 since = "0.6.0",
2439 note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
2440 )]
2441 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
2442 self.with_primary_module_instance_id(primary_module_instance);
2443 }
2444
2445 pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
2459 let was_replaced = self
2460 .primary_module_instance
2461 .replace(primary_module_instance)
2462 .is_some();
2463 assert!(
2464 !was_replaced,
2465 "Only one primary module can be given to the builder."
2466 );
2467 }
2468
2469 pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
2475 let was_replaced = self
2476 .primary_module_kind
2477 .replace(primary_module_kind)
2478 .is_some();
2479 assert!(
2480 !was_replaced,
2481 "Only one primary module kind can be given to the builder."
2482 );
2483 }
2484
2485 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
2486 self.meta_service = meta_service;
2487 }
2488
2489 async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> {
2490 if let Ok(client_config) = self.load_existing_config().await {
2494 for (module_id, module_cfg) in client_config.modules {
2495 let kind = module_cfg.kind.clone();
2496 let Some(init) = self.module_inits.get(&kind) else {
2497 continue;
2499 };
2500
2501 apply_migrations_client(
2502 db,
2503 kind.to_string(),
2504 init.get_database_migrations(),
2505 module_id,
2506 )
2507 .await?;
2508 }
2509 }
2510
2511 Ok(())
2512 }
2513
2514 pub fn db_no_decoders(&self) -> &Database {
2515 &self.db_no_decoders
2516 }
2517
2518 pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
2519 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2520 bail!("Client database not initialized")
2521 };
2522
2523 Ok(config)
2524 }
2525
2526 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
2527 self.admin_creds = Some(creds);
2528 }
2529
2530 pub fn with_connector(&mut self, connector: Connector) {
2531 self.connector = connector;
2532 }
2533
2534 #[cfg(feature = "tor")]
2535 pub fn with_tor_connector(&mut self) {
2536 self.with_connector(Connector::tor());
2537 }
2538
2539 async fn init(
2540 self,
2541 pre_root_secret: DerivableSecret,
2542 config: ClientConfig,
2543 api_secret: Option<String>,
2544 init_mode: InitMode,
2545 ) -> anyhow::Result<ClientHandle> {
2546 if Client::is_initialized(&self.db_no_decoders).await {
2547 bail!("Client database already initialized")
2548 }
2549
2550 {
2553 debug!(target: LOG_CLIENT, "Initializing client database");
2554 let mut dbtx = self.db_no_decoders.begin_transaction().await;
2555 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
2557 dbtx.insert_entry(
2558 &ClientPreRootSecretHashKey,
2559 &pre_root_secret.derive_pre_root_secret_hash(),
2560 )
2561 .await;
2562
2563 if let Some(api_secret) = api_secret.as_ref() {
2564 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
2565 }
2566
2567 let init_state = InitState::Pending(init_mode);
2568 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
2569
2570 let metadata = init_state
2571 .does_require_recovery()
2572 .flatten()
2573 .map_or(Metadata::empty(), |s| s.metadata);
2574
2575 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
2576
2577 dbtx.commit_tx_result().await?;
2578 }
2579
2580 let stopped = self.stopped;
2581 self.build(pre_root_secret, config, api_secret, stopped)
2582 .await
2583 }
2584
2585 pub async fn join(
2659 self,
2660 pre_root_secret: DerivableSecret,
2661 config: ClientConfig,
2662 api_secret: Option<String>,
2663 ) -> anyhow::Result<ClientHandle> {
2664 self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
2665 .await
2666 }
2667
2668 pub async fn download_backup_from_federation(
2670 &self,
2671 root_secret: &DerivableSecret,
2672 config: &ClientConfig,
2673 api_secret: Option<String>,
2674 ) -> anyhow::Result<Option<ClientBackup>> {
2675 let connector = self.connector;
2676 let api = DynGlobalApi::from_endpoints(
2677 config
2679 .global
2680 .api_endpoints
2681 .iter()
2682 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
2683 &api_secret,
2684 &connector,
2685 );
2686 Client::download_backup_from_federation_static(
2687 &api,
2688 &Self::federation_root_secret(root_secret, config),
2689 &self.decoders(config),
2690 )
2691 .await
2692 }
2693
2694 pub async fn recover(
2705 self,
2706 root_secret: DerivableSecret,
2707 config: ClientConfig,
2708 api_secret: Option<String>,
2709 backup: Option<ClientBackup>,
2710 ) -> anyhow::Result<ClientHandle> {
2711 let client = self
2712 .init(
2713 root_secret,
2714 config,
2715 api_secret,
2716 InitMode::Recover {
2717 snapshot: backup.clone(),
2718 },
2719 )
2720 .await?;
2721
2722 Ok(client)
2723 }
2724
2725 pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
2726 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2727 bail!("Client database not initialized")
2728 };
2729
2730 if let Some(secret_hash) = self
2731 .db_no_decoders()
2732 .begin_transaction_nc()
2733 .await
2734 .get_value(&ClientPreRootSecretHashKey)
2735 .await
2736 {
2737 ensure!(
2738 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
2739 "Secret hash does not match. Incorrect secret"
2740 );
2741 } else {
2742 debug!(target: LOG_CLIENT, "Backfilling secret hash");
2743 let mut dbtx = self.db_no_decoders.begin_transaction().await;
2745 dbtx.insert_entry(
2746 &ClientPreRootSecretHashKey,
2747 &pre_root_secret.derive_pre_root_secret_hash(),
2748 )
2749 .await;
2750 dbtx.commit_tx().await;
2751 }
2752
2753 let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
2754 let stopped = self.stopped;
2755
2756 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2757 let client = self
2758 .build_stopped(
2759 pre_root_secret,
2760 &config,
2761 api_secret,
2762 log_event_added_transient_tx,
2763 )
2764 .await?;
2765 if !stopped {
2766 client.as_inner().start_executor();
2767 }
2768 Ok(client)
2769 }
2770
2771 async fn build(
2773 self,
2774 pre_root_secret: DerivableSecret,
2775 config: ClientConfig,
2776 api_secret: Option<String>,
2777 stopped: bool,
2778 ) -> anyhow::Result<ClientHandle> {
2779 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
2780 let client = self
2781 .build_stopped(
2782 pre_root_secret,
2783 &config,
2784 api_secret,
2785 log_event_added_transient_tx,
2786 )
2787 .await?;
2788 if !stopped {
2789 client.as_inner().start_executor();
2790 }
2791
2792 Ok(client)
2793 }
2794
2795 async fn build_stopped(
2798 self,
2799 root_secret: DerivableSecret,
2800 config: &ClientConfig,
2801 api_secret: Option<String>,
2802 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
2803 ) -> anyhow::Result<ClientHandle> {
2804 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
2805 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
2806
2807 let decoders = self.decoders(config);
2808 let config = Self::config_decoded(config, &decoders)?;
2809 let fed_id = config.calculate_federation_id();
2810 let db = self.db_no_decoders.with_decoders(decoders.clone());
2811 let connector = self.connector;
2812 let peer_urls = get_api_urls(&db, &config).await;
2813 let api = if let Some(admin_creds) = self.admin_creds.as_ref() {
2814 ReconnectFederationApi::new_admin(
2815 admin_creds.peer_id,
2816 peer_urls
2817 .into_iter()
2818 .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
2819 .context("Admin creds should match a peer")?,
2820 &api_secret,
2821 &connector,
2822 )
2823 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2824 .with_cache()
2825 .into()
2826 } else {
2827 ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, &connector, None)
2828 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
2829 .with_cache()
2830 .into()
2831 };
2832 let task_group = TaskGroup::new();
2833
2834 self.migrate_database(&db).await?;
2837
2838 let init_state = Self::load_init_state(&db).await;
2839
2840 let primary_module_instance = self
2841 .primary_module_instance
2842 .or_else(|| {
2843 let primary_module_kind = self.primary_module_kind?;
2844 config
2845 .modules
2846 .iter()
2847 .find_map(|(module_instance_id, module_config)| {
2848 (module_config.kind() == &primary_module_kind)
2849 .then_some(*module_instance_id)
2850 })
2851 })
2852 .ok_or(anyhow!("No primary module set or found"))?;
2853
2854 let notifier = Notifier::new(db.clone());
2855
2856 let common_api_versions = Client::load_and_refresh_common_api_version_static(
2857 &config,
2858 &self.module_inits,
2859 &api,
2860 &db,
2861 &task_group,
2862 )
2863 .await
2864 .inspect_err(|err| {
2865 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
2866 })
2867 .unwrap_or(ApiVersionSet {
2868 core: ApiVersion::new(0, 0),
2869 modules: BTreeMap::new(),
2871 });
2872
2873 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
2874
2875 let mut module_recoveries: BTreeMap<
2876 ModuleInstanceId,
2877 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
2878 > = BTreeMap::new();
2879 let mut module_recovery_progress_receivers: BTreeMap<
2880 ModuleInstanceId,
2881 watch::Receiver<RecoveryProgress>,
2882 > = BTreeMap::new();
2883
2884 let final_client = FinalClientIface::default();
2885
2886 let root_secret = Self::federation_root_secret(&root_secret, &config);
2887
2888 let modules = {
2889 let mut modules = ClientModuleRegistry::default();
2890 for (module_instance_id, module_config) in config.modules.clone() {
2891 let kind = module_config.kind().clone();
2892 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
2893 debug!(
2894 target: LOG_CLIENT,
2895 kind=%kind,
2896 instance_id=%module_instance_id,
2897 "Module kind of instance not found in module gens, skipping");
2898 continue;
2899 };
2900
2901 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
2902 else {
2903 warn!(
2904 target: LOG_CLIENT,
2905 kind=%kind,
2906 instance_id=%module_instance_id,
2907 "Module kind of instance has incompatible api version, skipping"
2908 );
2909 continue;
2910 };
2911
2912 let start_module_recover_fn =
2915 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
2916 let module_config = module_config.clone();
2917 let num_peers = NumPeers::from(config.global.api_endpoints.len());
2918 let db = db.clone();
2919 let kind = kind.clone();
2920 let notifier = notifier.clone();
2921 let api = api.clone();
2922 let root_secret = root_secret.clone();
2923 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
2924 let final_client = final_client.clone();
2925 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
2926 let task_group = task_group.clone();
2927 let module_init = module_init.clone();
2928 (
2929 Box::pin(async move {
2930 module_init
2931 .recover(
2932 final_client.clone(),
2933 fed_id,
2934 num_peers,
2935 module_config.clone(),
2936 db.clone(),
2937 module_instance_id,
2938 common_api_versions.core,
2939 api_version,
2940 root_secret.derive_module_secret(module_instance_id),
2941 notifier.clone(),
2942 api.clone(),
2943 admin_auth,
2944 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
2945 progress_tx,
2946 task_group,
2947 )
2948 .await
2949 .inspect_err(|err| {
2950 warn!(
2951 target: LOG_CLIENT,
2952 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
2953 );
2954 })
2955 }),
2956 progress_rx,
2957 )
2958 };
2959
2960 let recovery = if let Some(snapshot) = init_state.does_require_recovery() {
2961 if let Some(module_recovery_state) = db
2962 .begin_transaction_nc()
2963 .await
2964 .get_value(&ClientModuleRecovery { module_instance_id })
2965 .await
2966 {
2967 if module_recovery_state.is_done() {
2968 debug!(
2969 id = %module_instance_id,
2970 %kind, "Module recovery already complete"
2971 );
2972 None
2973 } else {
2974 debug!(
2975 id = %module_instance_id,
2976 %kind,
2977 progress = %module_recovery_state.progress,
2978 "Starting module recovery with an existing progress"
2979 );
2980 Some(start_module_recover_fn(
2981 snapshot,
2982 module_recovery_state.progress,
2983 ))
2984 }
2985 } else {
2986 let progress = RecoveryProgress::none();
2987 let mut dbtx = db.begin_transaction().await;
2988 dbtx.log_event(
2989 log_ordering_wakeup_tx.clone(),
2990 None,
2991 ModuleRecoveryStarted {
2992 module_id: module_instance_id,
2993 },
2994 )
2995 .await;
2996 dbtx.insert_entry(
2997 &ClientModuleRecovery { module_instance_id },
2998 &ClientModuleRecoveryState { progress },
2999 )
3000 .await;
3001
3002 dbtx.commit_tx().await;
3003
3004 debug!(
3005 id = %module_instance_id,
3006 %kind, "Starting new module recovery"
3007 );
3008 Some(start_module_recover_fn(snapshot, progress))
3009 }
3010 } else {
3011 None
3012 };
3013
3014 if let Some((recovery, recovery_progress_rx)) = recovery {
3015 module_recoveries.insert(module_instance_id, recovery);
3016 module_recovery_progress_receivers
3017 .insert(module_instance_id, recovery_progress_rx);
3018 } else {
3019 let module = module_init
3020 .init(
3021 final_client.clone(),
3022 fed_id,
3023 config.global.api_endpoints.len(),
3024 module_config,
3025 db.clone(),
3026 module_instance_id,
3027 common_api_versions.core,
3028 api_version,
3029 root_secret.derive_module_secret(module_instance_id),
3034 notifier.clone(),
3035 api.clone(),
3036 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
3037 task_group.clone(),
3038 )
3039 .await?;
3040
3041 if primary_module_instance == module_instance_id
3042 && !module.supports_being_primary()
3043 {
3044 bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module");
3045 }
3046
3047 modules.register_module(module_instance_id, kind, module);
3048 }
3049 }
3050 modules
3051 };
3052
3053 if init_state.is_pending() && module_recoveries.is_empty() {
3054 let mut dbtx = db.begin_transaction().await;
3055 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
3056 .await;
3057 dbtx.commit_tx().await;
3058 }
3059
3060 let executor = {
3061 let mut executor_builder = Executor::builder();
3062 executor_builder
3063 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
3064
3065 for (module_instance_id, _, module) in modules.iter_modules() {
3066 executor_builder.with_module_dyn(module.context(module_instance_id));
3067 }
3068
3069 for module_instance_id in module_recoveries.keys() {
3070 executor_builder.with_valid_module_id(*module_instance_id);
3071 }
3072
3073 executor_builder.build(db.clone(), notifier, task_group.clone())
3074 };
3075
3076 let recovery_receiver_init_val = module_recovery_progress_receivers
3077 .iter()
3078 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
3079 .collect::<BTreeMap<_, _>>();
3080 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
3081 watch::channel(recovery_receiver_init_val);
3082
3083 let client_inner = Arc::new(Client {
3084 config: RwLock::new(config.clone()),
3085 api_secret,
3086 decoders,
3087 db: db.clone(),
3088 federation_id: fed_id,
3089 federation_config_meta: config.global.meta,
3090 primary_module_instance,
3091 modules,
3092 module_inits: self.module_inits.clone(),
3093 log_ordering_wakeup_tx,
3094 log_event_added_rx,
3095 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
3096 executor,
3097 api,
3098 secp_ctx: Secp256k1::new(),
3099 root_secret,
3100 task_group,
3101 operation_log: OperationLog::new(db.clone()),
3102 client_recovery_progress_receiver,
3103 meta_service: self.meta_service,
3104 connector,
3105 });
3106 client_inner
3107 .task_group
3108 .spawn_cancellable("MetaService::update_continuously", {
3109 let client_inner = client_inner.clone();
3110 async move {
3111 client_inner
3112 .meta_service
3113 .update_continuously(&client_inner)
3114 .await;
3115 }
3116 });
3117
3118 client_inner.task_group.spawn_cancellable(
3119 "update-api-announcements",
3120 run_api_announcement_sync(client_inner.clone()),
3121 );
3122
3123 client_inner.task_group.spawn_cancellable(
3124 "event log ordering task",
3125 run_event_log_ordering_task(
3126 db.clone(),
3127 log_ordering_wakeup_rx,
3128 log_event_added_tx,
3129 log_event_added_transient_tx,
3130 ),
3131 );
3132 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
3133
3134 let client_arc = ClientHandle::new(client_inner);
3135
3136 for (_, _, module) in client_arc.modules.iter_modules() {
3137 module.start().await;
3138 }
3139
3140 final_client.set(client_iface.clone());
3141
3142 if !module_recoveries.is_empty() {
3143 client_arc.spawn_module_recoveries_task(
3144 client_recovery_progress_sender,
3145 module_recoveries,
3146 module_recovery_progress_receivers,
3147 );
3148 }
3149
3150 Ok(client_arc)
3151 }
3152
3153 async fn load_init_state(db: &Database) -> InitState {
3154 let mut dbtx = db.begin_transaction_nc().await;
3155 dbtx.get_value(&ClientInitStateKey)
3156 .await
3157 .unwrap_or_else(|| {
3158 warn!(
3161 target: LOG_CLIENT,
3162 "Client missing ClientRequiresRecovery: assuming complete"
3163 );
3164 db::InitState::Complete(db::InitModeComplete::Fresh)
3165 })
3166 }
3167
3168 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
3169 let mut decoders = client_decoders(
3170 &self.module_inits,
3171 config
3172 .modules
3173 .iter()
3174 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
3175 );
3176
3177 decoders.register_module(
3178 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
3179 ModuleKind::from_static_str("tx_submission"),
3180 tx_submission_sm_decoder(),
3181 );
3182
3183 decoders
3184 }
3185
3186 fn config_decoded(
3187 config: &ClientConfig,
3188 decoders: &ModuleDecoderRegistry,
3189 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
3190 config.clone().redecode_raw(decoders)
3191 }
3192
3193 fn federation_root_secret(
3197 root_secret: &DerivableSecret,
3198 config: &ClientConfig,
3199 ) -> DerivableSecret {
3200 root_secret.federation_key(&config.global.calculate_federation_id())
3201 }
3202
3203 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
3205 self.log_event_added_transient_tx.subscribe()
3206 }
3207}
3208
3209pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
3213 let mut tx = db.begin_transaction_nc().await;
3214 let client_secret = tx.get_value(&EncodedClientSecretKey).await;
3215
3216 match client_secret {
3217 Some(client_secret) => {
3218 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
3219 .map_err(|e| anyhow!("Decoding failed: {e}"))
3220 }
3221 None => bail!("Encoded client secret not present in DB"),
3222 }
3223}
3224
3225pub fn client_decoders<'a>(
3226 registry: &ModuleInitRegistry<DynClientModuleInit>,
3227 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
3228) -> ModuleDecoderRegistry {
3229 let mut modules = BTreeMap::new();
3230 for (id, kind) in module_kinds {
3231 let Some(init) = registry.get(kind) else {
3232 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
3233 continue;
3234 };
3235
3236 modules.insert(
3237 id,
3238 (
3239 kind.clone(),
3240 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
3241 ),
3242 );
3243 }
3244 ModuleDecoderRegistry::from(modules)
3245}