fedimint_client/module/
init.rs

1pub mod recovery;
2
3use std::collections::BTreeMap;
4use std::fmt::Debug;
5use std::marker;
6use std::sync::Arc;
7
8use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
9use fedimint_core::config::{ClientModuleConfig, FederationId, ModuleInitRegistry};
10use fedimint_core::core::{Decoder, ModuleInstanceId, ModuleKind};
11use fedimint_core::db::{Database, DatabaseVersion};
12use fedimint_core::module::{
13    ApiAuth, ApiVersion, CommonModuleInit, IDynCommonModuleInit, ModuleInit, MultiApiVersion,
14};
15use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup};
16use fedimint_core::{apply, async_trait_maybe_send, dyn_newtype_define, NumPeers};
17use fedimint_derive_secret::DerivableSecret;
18use fedimint_logging::LOG_CLIENT;
19use tokio::sync::watch;
20use tracing::warn;
21
22use super::recovery::{DynModuleBackup, RecoveryProgress};
23use super::{ClientContext, FinalClientIface};
24use crate::db::ClientMigrationFn;
25use crate::module::{ClientModule, DynClientModule};
26use crate::sm::{ModuleNotifier, Notifier};
27
28pub type ClientModuleInitRegistry = ModuleInitRegistry<DynClientModuleInit>;
29
30pub struct ClientModuleInitArgs<C>
31where
32    C: ClientModuleInit,
33{
34    federation_id: FederationId,
35    peer_num: usize,
36    cfg: <<C as ModuleInit>::Common as CommonModuleInit>::ClientConfig,
37    db: Database,
38    core_api_version: ApiVersion,
39    module_api_version: ApiVersion,
40    module_root_secret: DerivableSecret,
41    notifier: ModuleNotifier<<<C as ClientModuleInit>::Module as ClientModule>::States>,
42    api: DynGlobalApi,
43    admin_auth: Option<ApiAuth>,
44    module_api: DynModuleApi,
45    context: ClientContext<<C as ClientModuleInit>::Module>,
46    task_group: TaskGroup,
47}
48
49impl<C> ClientModuleInitArgs<C>
50where
51    C: ClientModuleInit,
52{
53    pub fn federation_id(&self) -> &FederationId {
54        &self.federation_id
55    }
56
57    pub fn peer_num(&self) -> usize {
58        self.peer_num
59    }
60
61    pub fn cfg(&self) -> &<<C as ModuleInit>::Common as CommonModuleInit>::ClientConfig {
62        &self.cfg
63    }
64
65    pub fn db(&self) -> &Database {
66        &self.db
67    }
68
69    pub fn core_api_version(&self) -> &ApiVersion {
70        &self.core_api_version
71    }
72
73    pub fn module_api_version(&self) -> &ApiVersion {
74        &self.module_api_version
75    }
76
77    pub fn module_root_secret(&self) -> &DerivableSecret {
78        &self.module_root_secret
79    }
80
81    pub fn notifier(
82        &self,
83    ) -> &ModuleNotifier<<<C as ClientModuleInit>::Module as ClientModule>::States> {
84        &self.notifier
85    }
86
87    pub fn api(&self) -> &DynGlobalApi {
88        &self.api
89    }
90
91    pub fn admin_auth(&self) -> Option<&ApiAuth> {
92        self.admin_auth.as_ref()
93    }
94
95    pub fn module_api(&self) -> &DynModuleApi {
96        &self.module_api
97    }
98
99    /// Get the [`ClientContext`] for later use
100    ///
101    /// Notably `ClientContext` can not be used during `ClientModuleInit::init`,
102    /// as the outer context is not yet complete. But it can be stored to be
103    /// used in the methods of [`ClientModule`], at which point it will be
104    /// ready.
105    pub fn context(&self) -> ClientContext<<C as ClientModuleInit>::Module> {
106        self.context.clone()
107    }
108
109    pub fn task_group(&self) -> &TaskGroup {
110        &self.task_group
111    }
112}
113
114pub struct ClientModuleRecoverArgs<C>
115where
116    C: ClientModuleInit,
117{
118    federation_id: FederationId,
119    num_peers: NumPeers,
120    cfg: <<C as ModuleInit>::Common as CommonModuleInit>::ClientConfig,
121    db: Database,
122    core_api_version: ApiVersion,
123    module_api_version: ApiVersion,
124    module_root_secret: DerivableSecret,
125    notifier: ModuleNotifier<<<C as ClientModuleInit>::Module as ClientModule>::States>,
126    api: DynGlobalApi,
127    admin_auth: Option<ApiAuth>,
128    module_api: DynModuleApi,
129    context: ClientContext<<C as ClientModuleInit>::Module>,
130    progress_tx: tokio::sync::watch::Sender<RecoveryProgress>,
131    task_group: TaskGroup,
132}
133
134impl<C> ClientModuleRecoverArgs<C>
135where
136    C: ClientModuleInit,
137{
138    pub fn federation_id(&self) -> &FederationId {
139        &self.federation_id
140    }
141
142    pub fn num_peers(&self) -> NumPeers {
143        self.num_peers
144    }
145
146    pub fn cfg(&self) -> &<<C as ModuleInit>::Common as CommonModuleInit>::ClientConfig {
147        &self.cfg
148    }
149
150    pub fn db(&self) -> &Database {
151        &self.db
152    }
153
154    pub fn task_group(&self) -> &TaskGroup {
155        &self.task_group
156    }
157
158    pub fn core_api_version(&self) -> &ApiVersion {
159        &self.core_api_version
160    }
161
162    pub fn module_api_version(&self) -> &ApiVersion {
163        &self.module_api_version
164    }
165
166    pub fn module_root_secret(&self) -> &DerivableSecret {
167        &self.module_root_secret
168    }
169
170    pub fn notifier(
171        &self,
172    ) -> &ModuleNotifier<<<C as ClientModuleInit>::Module as ClientModule>::States> {
173        &self.notifier
174    }
175
176    pub fn api(&self) -> &DynGlobalApi {
177        &self.api
178    }
179
180    pub fn admin_auth(&self) -> Option<&ApiAuth> {
181        self.admin_auth.as_ref()
182    }
183
184    pub fn module_api(&self) -> &DynModuleApi {
185        &self.module_api
186    }
187
188    /// Get the [`ClientContext`]
189    ///
190    /// Notably `ClientContext`, unlike [`ClientModuleInitArgs::context`],
191    /// the client context is guaranteed to be usable immediately.
192    pub fn context(&self) -> ClientContext<<C as ClientModuleInit>::Module> {
193        self.context.clone()
194    }
195
196    pub fn update_recovery_progress(&self, progress: RecoveryProgress) {
197        if progress.is_done() {
198            // Recovery is complete when the recovery function finishes. To avoid
199            // confusing any downstream code, we never send completed process.
200            warn!(target: LOG_CLIENT, "Module trying to send a completed recovery progress. Ignoring");
201        } else if progress.is_none() {
202            // Recovery starts with "none" none progress. To avoid
203            // confusing any downstream code, we never send none process afterwards.
204            warn!(target: LOG_CLIENT, "Module trying to send a none recovery progress. Ignoring");
205        } else if self.progress_tx.send(progress).is_err() {
206            warn!(target: LOG_CLIENT, "Module trying to send a recovery progress but nothing is listening");
207        }
208    }
209}
210
211#[apply(async_trait_maybe_send!)]
212pub trait ClientModuleInit: ModuleInit + Sized {
213    type Module: ClientModule;
214
215    /// Api versions of the corresponding server side module's API
216    /// that this client module implementation can use.
217    fn supported_api_versions(&self) -> MultiApiVersion;
218
219    fn kind() -> ModuleKind {
220        <Self::Module as ClientModule>::kind()
221    }
222
223    /// Recover the state of the client module, optionally from an existing
224    /// snapshot.
225    ///
226    /// If `Err` is returned, the higher level client/application might try
227    /// again at a different time (client restarted, code version changed, etc.)
228    async fn recover(
229        &self,
230        _args: &ClientModuleRecoverArgs<Self>,
231        _snapshot: Option<&<Self::Module as ClientModule>::Backup>,
232    ) -> anyhow::Result<()> {
233        warn!(
234            target: LOG_CLIENT,
235            kind = %<Self::Module as ClientModule>::kind(),
236            "Module does not support recovery, completing without doing anything"
237        );
238        Ok(())
239    }
240
241    /// Initialize a [`ClientModule`] instance from its config
242    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module>;
243
244    /// Retrieves the database migrations from the module to be applied to the
245    /// database before the module is initialized. The database migrations map
246    /// is indexed on the "from" version.
247    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientMigrationFn> {
248        BTreeMap::new()
249    }
250}
251
252#[apply(async_trait_maybe_send!)]
253pub trait IClientModuleInit: IDynCommonModuleInit + Debug + MaybeSend + MaybeSync {
254    fn decoder(&self) -> Decoder;
255
256    fn module_kind(&self) -> ModuleKind;
257
258    fn as_common(&self) -> &(dyn IDynCommonModuleInit + Send + Sync + 'static);
259
260    /// See [`ClientModuleInit::supported_api_versions`]
261    fn supported_api_versions(&self) -> MultiApiVersion;
262
263    #[allow(clippy::too_many_arguments)]
264    async fn recover(
265        &self,
266        final_client: FinalClientIface,
267        federation_id: FederationId,
268        num_peers: NumPeers,
269        cfg: ClientModuleConfig,
270        db: Database,
271        instance_id: ModuleInstanceId,
272        core_api_version: ApiVersion,
273        module_api_version: ApiVersion,
274        module_root_secret: DerivableSecret,
275        notifier: Notifier,
276        api: DynGlobalApi,
277        admin_auth: Option<ApiAuth>,
278        snapshot: Option<&DynModuleBackup>,
279        progress_tx: watch::Sender<RecoveryProgress>,
280        task_group: TaskGroup,
281    ) -> anyhow::Result<()>;
282
283    #[allow(clippy::too_many_arguments)]
284    async fn init(
285        &self,
286        final_client: FinalClientIface,
287        federation_id: FederationId,
288        peer_num: usize,
289        cfg: ClientModuleConfig,
290        db: Database,
291        instance_id: ModuleInstanceId,
292        core_api_version: ApiVersion,
293        module_api_version: ApiVersion,
294        module_root_secret: DerivableSecret,
295        notifier: Notifier,
296        api: DynGlobalApi,
297        admin_auth: Option<ApiAuth>,
298        task_group: TaskGroup,
299    ) -> anyhow::Result<DynClientModule>;
300
301    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientMigrationFn>;
302}
303
304#[apply(async_trait_maybe_send!)]
305impl<T> IClientModuleInit for T
306where
307    T: ClientModuleInit + 'static + MaybeSend + Sync,
308{
309    fn decoder(&self) -> Decoder {
310        <<T as ClientModuleInit>::Module as ClientModule>::decoder()
311    }
312
313    fn module_kind(&self) -> ModuleKind {
314        <Self as ModuleInit>::Common::KIND
315    }
316
317    fn as_common(&self) -> &(dyn IDynCommonModuleInit + Send + Sync + 'static) {
318        self
319    }
320
321    fn supported_api_versions(&self) -> MultiApiVersion {
322        <Self as ClientModuleInit>::supported_api_versions(self)
323    }
324
325    async fn recover(
326        &self,
327        final_client: FinalClientIface,
328        federation_id: FederationId,
329        num_peers: NumPeers,
330        cfg: ClientModuleConfig,
331        db: Database,
332        instance_id: ModuleInstanceId,
333        core_api_version: ApiVersion,
334        module_api_version: ApiVersion,
335        module_root_secret: DerivableSecret,
336        // TODO: make dyn type for notifier
337        notifier: Notifier,
338        api: DynGlobalApi,
339        admin_auth: Option<ApiAuth>,
340        snapshot: Option<&DynModuleBackup>,
341        progress_tx: watch::Sender<RecoveryProgress>,
342        task_group: TaskGroup,
343    ) -> anyhow::Result<()> {
344        let typed_cfg: &<<T as fedimint_core::module::ModuleInit>::Common as CommonModuleInit>::ClientConfig = cfg.cast()?;
345        let snapshot: Option<&<<Self as ClientModuleInit>::Module as ClientModule>::Backup> =
346            snapshot.map(|s| {
347                s.as_any()
348                    .downcast_ref()
349                    .expect("can't convert client module backup to desired type")
350            });
351
352        let (module_db, global_dbtx_access_token) = db.with_prefix_module_id(instance_id);
353        Ok(self
354            .recover(
355                &ClientModuleRecoverArgs {
356                    federation_id,
357                    num_peers,
358                    cfg: typed_cfg.clone(),
359                    db: module_db.clone(),
360                    core_api_version,
361                    module_api_version,
362                    module_root_secret,
363                    notifier: notifier.module_notifier(instance_id),
364                    api: api.clone(),
365                    admin_auth,
366                    module_api: api.with_module(instance_id),
367                    context: ClientContext {
368                        client: final_client,
369                        module_instance_id: instance_id,
370                        global_dbtx_access_token,
371                        module_db,
372                        _marker: marker::PhantomData,
373                    },
374                    progress_tx,
375                    task_group,
376                },
377                snapshot,
378            )
379            .await?)
380    }
381
382    async fn init(
383        &self,
384        final_client: FinalClientIface,
385        federation_id: FederationId,
386        peer_num: usize,
387        cfg: ClientModuleConfig,
388        db: Database,
389        instance_id: ModuleInstanceId,
390        core_api_version: ApiVersion,
391        module_api_version: ApiVersion,
392        module_root_secret: DerivableSecret,
393        // TODO: make dyn type for notifier
394        notifier: Notifier,
395        api: DynGlobalApi,
396        admin_auth: Option<ApiAuth>,
397        task_group: TaskGroup,
398    ) -> anyhow::Result<DynClientModule> {
399        let typed_cfg: &<<T as fedimint_core::module::ModuleInit>::Common as CommonModuleInit>::ClientConfig = cfg.cast()?;
400        let (module_db, global_dbtx_access_token) = db.with_prefix_module_id(instance_id);
401        Ok(self
402            .init(&ClientModuleInitArgs {
403                federation_id,
404                peer_num,
405                cfg: typed_cfg.clone(),
406                db: module_db.clone(),
407                core_api_version,
408                module_api_version,
409                module_root_secret,
410                notifier: notifier.module_notifier(instance_id),
411                api: api.clone(),
412                admin_auth,
413                module_api: api.with_module(instance_id),
414                context: ClientContext {
415                    client: final_client,
416                    module_instance_id: instance_id,
417                    module_db,
418                    global_dbtx_access_token,
419                    _marker: marker::PhantomData,
420                },
421                task_group,
422            })
423            .await?
424            .into())
425    }
426
427    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientMigrationFn> {
428        <Self as ClientModuleInit>::get_database_migrations(self)
429    }
430}
431
432dyn_newtype_define!(
433    #[derive(Clone)]
434    pub DynClientModuleInit(Arc<IClientModuleInit>)
435);
436
437impl AsRef<dyn IDynCommonModuleInit + Send + Sync + 'static> for DynClientModuleInit {
438    fn as_ref(&self) -> &(dyn IDynCommonModuleInit + Send + Sync + 'static) {
439        self.inner.as_common()
440    }
441}
442
443impl AsRef<dyn IClientModuleInit + 'static> for DynClientModuleInit {
444    fn as_ref(&self) -> &(dyn IClientModuleInit + 'static) {
445        self.inner.as_ref()
446    }
447}