fedimint_client/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::{ffi, marker, ops};
6
7use anyhow::{anyhow, bail};
8use bitcoin::secp256k1::PublicKey;
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_core::config::ClientConfig;
11use fedimint_core::core::{
12    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
13    OperationId,
14};
15use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken};
16use fedimint_core::encoding::{Decodable, Encodable};
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
19use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
20use fedimint_core::task::{MaybeSend, MaybeSync};
21use fedimint_core::util::BoxStream;
22use fedimint_core::{
23    apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send_sync, Amount, OutPoint,
24    TransactionId,
25};
26use futures::Stream;
27use serde::de::DeserializeOwned;
28use serde::Serialize;
29
30use self::init::ClientModuleInit;
31use crate::db::event_log::Event;
32use crate::module::recovery::{DynModuleBackup, ModuleBackup};
33use crate::oplog::{OperationLogEntry, UpdateStreamOrOutcome};
34use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, State};
35use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
36use crate::{
37    oplog, AddStateMachinesResult, Client, ClientStrong, ClientWeak,
38    InstancelessDynClientInputBundle, TransactionUpdates,
39};
40
41pub mod init;
42pub mod recovery;
43
44pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
45
46/// A final, fully initialized [`crate::Client`]
47///
48/// Client modules need to be able to access a `Client` they are a part
49/// of. To break the circular dependency, the final `Client` is passed
50/// after `Client` was built via a shared state.
51#[derive(Clone, Default)]
52pub struct FinalClient(Arc<std::sync::OnceLock<ClientWeak>>);
53
54impl FinalClient {
55    /// Get a temporary [`ClientStrong`]
56    ///
57    /// Care must be taken to not let the user take ownership of this value,
58    /// and not store it elsewhere permanently either, as it could prevent
59    /// the cleanup of the Client.
60    pub(crate) fn get(&self) -> ClientStrong {
61        self.0
62            .get()
63            .expect("client must be already set")
64            .upgrade()
65            .expect("client module context must not be use past client shutdown")
66    }
67
68    pub(crate) fn set(&self, client: ClientWeak) {
69        self.0.set(client).expect("FinalLazyClient already set");
70    }
71}
72
73/// A Client context for a [`ClientModule`] `M`
74///
75/// Client modules can interact with the whole
76/// client through this struct.
77pub struct ClientContext<M> {
78    client: FinalClient,
79    module_instance_id: ModuleInstanceId,
80    global_dbtx_access_token: GlobalDBTxAccessToken,
81    module_db: Database,
82    _marker: marker::PhantomData<M>,
83}
84
85impl<M> Clone for ClientContext<M> {
86    fn clone(&self) -> Self {
87        Self {
88            client: self.client.clone(),
89            module_db: self.module_db.clone(),
90            module_instance_id: self.module_instance_id,
91            _marker: marker::PhantomData,
92            global_dbtx_access_token: self.global_dbtx_access_token,
93        }
94    }
95}
96
97/// A reference back to itself that the module cacn get from the
98/// [`ClientContext`]
99pub struct ClientContextSelfRef<'s, M> {
100    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
101    // stored permanently somewhere
102    client: ClientStrong,
103    module_instance_id: ModuleInstanceId,
104    _marker: marker::PhantomData<&'s M>,
105}
106
107impl<M> ops::Deref for ClientContextSelfRef<'_, M>
108where
109    M: ClientModule,
110{
111    type Target = M;
112
113    fn deref(&self) -> &Self::Target {
114        self.client
115            .get_module(self.module_instance_id)
116            .as_any()
117            .downcast_ref::<M>()
118            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
119    }
120}
121
122impl<M> fmt::Debug for ClientContext<M> {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.write_str("ClientContext")
125    }
126}
127
128impl<M> ClientContext<M>
129where
130    M: ClientModule,
131{
132    /// Get a reference back to client module from the [`Self`]
133    ///
134    /// It's often necessary for a client module to "move self"
135    /// by-value, especially due to async lifetimes issues.
136    /// Clients usually work with `&mut self`, which can't really
137    /// work in such context.
138    ///
139    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
140    /// can be used to recover the reference to the module at later
141    /// time.
142    #[allow(clippy::needless_lifetimes)] // just for explicitiness
143    pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
144        ClientContextSelfRef {
145            client: self.client.get(),
146            module_instance_id: self.module_instance_id,
147            _marker: marker::PhantomData,
148        }
149    }
150
151    /// Get a reference to a global Api handle
152    pub fn global_api(&self) -> DynGlobalApi {
153        self.client.get().api_clone()
154    }
155    pub fn decoders(&self) -> ModuleDecoderRegistry {
156        self.client.get().decoders().clone()
157    }
158
159    pub fn input_from_dyn<'i>(
160        &self,
161        input: &'i DynInput,
162    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
163        (input.module_instance_id() == self.module_instance_id).then(|| {
164            input
165                .as_any()
166                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
167                .expect("instance_id just checked")
168        })
169    }
170
171    pub fn output_from_dyn<'o>(
172        &self,
173        output: &'o DynOutput,
174    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
175        (output.module_instance_id() == self.module_instance_id).then(|| {
176            output
177                .as_any()
178                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
179                .expect("instance_id just checked")
180        })
181    }
182
183    pub fn map_dyn<'s, 'i, 'o, I>(
184        &'s self,
185        typed: impl IntoIterator<Item = I> + 'i,
186    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
187    where
188        I: IntoDynInstance,
189        'i: 'o,
190        's: 'o,
191    {
192        typed.into_iter().map(|i| self.make_dyn(i))
193    }
194
195    /// Turn a typed output into a dyn version
196    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
197        self.make_dyn(output)
198    }
199
200    /// Turn a typed input into a dyn version
201    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
202        self.make_dyn(input)
203    }
204
205    /// Turn a `typed` into a dyn version
206    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
207    where
208        I: IntoDynInstance,
209    {
210        typed.into_dyn(self.module_instance_id)
211    }
212
213    /// Turn a typed [`ClientOutputBundle`] into a dyn version
214    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
215    where
216        O: IntoDynInstance<DynType = DynOutput> + 'static,
217        S: IntoDynInstance<DynType = DynState> + 'static,
218    {
219        self.make_dyn(output)
220    }
221
222    /// Turn a typed [`ClientInputBundle`] into a dyn version
223    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
224    where
225        I: IntoDynInstance<DynType = DynInput> + 'static,
226        S: IntoDynInstance<DynType = DynState> + 'static,
227    {
228        self.make_dyn(inputs)
229    }
230
231    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
232    where
233        S: sm::IState + 'static,
234    {
235        DynState::from_typed(self.module_instance_id, sm)
236    }
237
238    /// See [`crate::Client::finalize_and_submit_transaction`]
239    pub async fn finalize_and_submit_transaction<F, Meta>(
240        &self,
241        operation_id: OperationId,
242        operation_type: &str,
243        operation_meta: F,
244        tx_builder: TransactionBuilder,
245    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
246    where
247        F: Fn(TransactionId, Vec<OutPoint>) -> Meta + Clone + MaybeSend + MaybeSync,
248        Meta: serde::Serialize + MaybeSend,
249    {
250        self.client
251            .get()
252            .finalize_and_submit_transaction(
253                operation_id,
254                operation_type,
255                operation_meta,
256                tx_builder,
257            )
258            .await
259    }
260
261    /// See [`crate::Client::transaction_updates`]
262    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
263        self.client.get().transaction_updates(operation_id).await
264    }
265
266    /// See [`crate::Client::await_primary_module_outputs`]
267    pub async fn await_primary_module_outputs(
268        &self,
269        operation_id: OperationId,
270        outputs: Vec<OutPoint>,
271    ) -> anyhow::Result<()> {
272        self.client
273            .get()
274            .await_primary_module_outputs(operation_id, outputs)
275            .await
276    }
277
278    // TODO: unify with `Self::get_operation`
279    pub async fn get_operation(
280        &self,
281        operation_id: OperationId,
282    ) -> anyhow::Result<oplog::OperationLogEntry> {
283        let operation = self
284            .client
285            .get()
286            .operation_log()
287            .get_operation(operation_id)
288            .await
289            .ok_or(anyhow::anyhow!("Operation not found"))?;
290
291        if operation.operation_module_kind() != M::kind().as_str() {
292            bail!("Operation is not a lightning operation");
293        }
294
295        Ok(operation)
296    }
297
298    /// Get global db.
299    ///
300    /// Only intended for internal use (private).
301    fn global_db(&self) -> fedimint_core::db::Database {
302        let db = self.client.get().db().clone();
303
304        db.ensure_global()
305            .expect("global_db must always return a global db");
306
307        db
308    }
309
310    pub fn module_db(&self) -> &Database {
311        self.module_db
312            .ensure_isolated()
313            .expect("module_db must always return isolated db");
314        &self.module_db
315    }
316
317    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
318        self.client.get().has_active_states(op_id).await
319    }
320
321    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
322        self.client.get().operation_exists(op_id).await
323    }
324
325    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
326        self.client
327            .get()
328            .executor
329            .get_active_states()
330            .await
331            .into_iter()
332            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
333            .map(|s| {
334                (
335                    s.0.as_any()
336                        .downcast_ref::<M::States>()
337                        .expect("incorrect output type passed to module plugin")
338                        .clone(),
339                    s.1,
340                )
341            })
342            .collect()
343    }
344
345    pub async fn get_config(&self) -> ClientConfig {
346        self.client.get().config().await
347    }
348
349    /// Returns an invite code for the federation that points to an arbitrary
350    /// guardian server for fetching the config
351    pub async fn get_invite_code(&self) -> InviteCode {
352        let cfg = self.get_config().await.global;
353        self.client
354            .get()
355            .invite_code(
356                *cfg.api_endpoints
357                    .keys()
358                    .next()
359                    .expect("A federation always has at least one guardian"),
360            )
361            .await
362            .expect("The guardian we requested an invite code for exists")
363    }
364
365    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
366        self.client.get().get_internal_payment_markers()
367    }
368
369    /// This method starts n state machines with given operation id without a
370    /// corresponding transaction
371    pub async fn manual_operation_start(
372        &self,
373        operation_id: OperationId,
374        op_type: &str,
375        operation_meta: impl serde::Serialize + Debug,
376        sms: Vec<DynState>,
377    ) -> anyhow::Result<()> {
378        let db = self.module_db();
379        let mut dbtx = db.begin_transaction().await;
380        {
381            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
382
383            self.manual_operation_start_inner(
384                &mut dbtx.to_ref_nc(),
385                operation_id,
386                op_type,
387                operation_meta,
388                sms,
389            )
390            .await?;
391        }
392
393        dbtx.commit_tx_result().await.map_err(|_| {
394            anyhow!(
395                "Operation with id {} already exists",
396                operation_id.fmt_short()
397            )
398        })?;
399
400        Ok(())
401    }
402
403    pub async fn manual_operation_start_dbtx(
404        &self,
405        dbtx: &mut DatabaseTransaction<'_>,
406        operation_id: OperationId,
407        op_type: &str,
408        operation_meta: impl serde::Serialize + Debug,
409        sms: Vec<DynState>,
410    ) -> anyhow::Result<()> {
411        self.manual_operation_start_inner(
412            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
413            operation_id,
414            op_type,
415            operation_meta,
416            sms,
417        )
418        .await
419    }
420
421    /// See [`Self::manual_operation_start`], just inside a database
422    /// transaction.
423    async fn manual_operation_start_inner(
424        &self,
425        dbtx: &mut DatabaseTransaction<'_>,
426        operation_id: OperationId,
427        op_type: &str,
428        operation_meta: impl serde::Serialize + Debug,
429        sms: Vec<DynState>,
430    ) -> anyhow::Result<()> {
431        dbtx.ensure_global()
432            .expect("Must deal with global dbtx here");
433
434        if Client::operation_exists_dbtx(&mut dbtx.to_ref_nc(), operation_id).await {
435            bail!(
436                "Operation with id {} already exists",
437                operation_id.fmt_short()
438            );
439        }
440
441        self.client
442            .get()
443            .operation_log
444            .add_operation_log_entry(&mut dbtx.to_ref_nc(), operation_id, op_type, operation_meta)
445            .await;
446
447        self.client
448            .get()
449            .executor
450            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
451            .await
452            .expect("State machine is valid");
453
454        Ok(())
455    }
456
457    pub fn outcome_or_updates<U, S>(
458        &self,
459        operation: &OperationLogEntry,
460        operation_id: OperationId,
461        stream_gen: impl FnOnce() -> S,
462    ) -> UpdateStreamOrOutcome<U>
463    where
464        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
465        S: Stream<Item = U> + MaybeSend + 'static,
466    {
467        operation.outcome_or_updates(&self.global_db(), operation_id, stream_gen)
468    }
469
470    pub async fn claim_inputs<I, S>(
471        &self,
472        dbtx: &mut DatabaseTransaction<'_>,
473        inputs: ClientInputBundle<I, S>,
474        operation_id: OperationId,
475    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
476    where
477        I: IInput + MaybeSend + MaybeSync + 'static,
478        S: sm::IState + MaybeSend + MaybeSync + 'static,
479    {
480        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
481            .await
482    }
483
484    async fn claim_inputs_dyn(
485        &self,
486        dbtx: &mut DatabaseTransaction<'_>,
487        inputs: InstancelessDynClientInputBundle,
488        operation_id: OperationId,
489    ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
490        let tx_builder =
491            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
492
493        self.client
494            .get()
495            .finalize_and_submit_transaction_inner(
496                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
497                operation_id,
498                tx_builder,
499            )
500            .await
501    }
502
503    pub async fn add_state_machines_dbtx(
504        &self,
505        dbtx: &mut DatabaseTransaction<'_>,
506        states: Vec<DynState>,
507    ) -> AddStateMachinesResult {
508        self.client
509            .get()
510            .executor
511            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
512            .await
513    }
514
515    pub async fn add_operation_log_entry_dbtx(
516        &self,
517        dbtx: &mut DatabaseTransaction<'_>,
518        operation_id: OperationId,
519        operation_type: &str,
520        operation_meta: impl serde::Serialize,
521    ) {
522        self.client
523            .get()
524            .operation_log()
525            .add_operation_log_entry(
526                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
527                operation_id,
528                operation_type,
529                operation_meta,
530            )
531            .await;
532    }
533
534    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
535    where
536        E: Event + Send,
537        Cap: Send,
538    {
539        self.client
540            .get()
541            .log_event_dbtx(
542                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
543                Some(self.module_instance_id),
544                event,
545            )
546            .await;
547    }
548}
549
550/// Fedimint module client
551#[apply(async_trait_maybe_send!)]
552pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
553    type Init: ClientModuleInit;
554
555    /// Common module types shared between client and server
556    type Common: ModuleCommon;
557
558    /// Data stored in regular backups so that restoring doesn't have to start
559    /// from epoch 0
560    type Backup: ModuleBackup;
561
562    /// Data and API clients available to state machine transitions of this
563    /// module
564    type ModuleStateMachineContext: Context;
565
566    /// All possible states this client can submit to the executor
567    type States: State<ModuleContext = Self::ModuleStateMachineContext>
568        + IntoDynInstance<DynType = DynState>;
569
570    fn decoder() -> Decoder {
571        let mut decoder_builder = Self::Common::decoder_builder();
572        decoder_builder.with_decodable_type::<Self::States>();
573        decoder_builder.with_decodable_type::<Self::Backup>();
574        decoder_builder.build()
575    }
576
577    fn kind() -> ModuleKind {
578        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
579    }
580
581    fn context(&self) -> Self::ModuleStateMachineContext;
582
583    /// Initialize client.
584    ///
585    /// Called by the core client code on start, after [`ClientContext`] is
586    /// fully initialized, so unlike during [`ClientModuleInit::init`],
587    /// access to global client is allowed.
588    async fn start(&self) {}
589
590    async fn handle_cli_command(
591        &self,
592        _args: &[ffi::OsString],
593    ) -> anyhow::Result<serde_json::Value> {
594        Err(anyhow::format_err!(
595            "This module does not implement cli commands"
596        ))
597    }
598
599    async fn handle_rpc(
600        &self,
601        _method: String,
602        _request: serde_json::Value,
603    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
604        Box::pin(futures::stream::once(std::future::ready(Err(
605            anyhow::format_err!("This module does not implement rpc"),
606        ))))
607    }
608
609    /// Returns the fee the processing of this input requires.
610    ///
611    /// If the semantics of a given input aren't known this function returns
612    /// `None`, this only happens if a future version of Fedimint introduces a
613    /// new input variant. For clients this should only be the case when
614    /// processing transactions created by other users, so the result of
615    /// this function can be `unwrap`ped whenever dealing with inputs
616    /// generated by ourselves.
617    fn input_fee(
618        &self,
619        amount: Amount,
620        input: &<Self::Common as ModuleCommon>::Input,
621    ) -> Option<Amount>;
622
623    /// Returns the fee the processing of this output requires.
624    ///
625    /// If the semantics of a given output aren't known this function returns
626    /// `None`, this only happens if a future version of Fedimint introduces a
627    /// new output variant. For clients this should only be the case when
628    /// processing transactions created by other users, so the result of
629    /// this function can be `unwrap`ped whenever dealing with inputs
630    /// generated by ourselves.
631    fn output_fee(
632        &self,
633        amount: Amount,
634        output: &<Self::Common as ModuleCommon>::Output,
635    ) -> Option<Amount>;
636
637    fn supports_backup(&self) -> bool {
638        false
639    }
640
641    async fn backup(&self) -> anyhow::Result<Self::Backup> {
642        anyhow::bail!("Backup not supported");
643    }
644
645    /// Does this module support being a primary module
646    ///
647    /// If it does it must implement:
648    ///
649    /// * [`Self::create_final_inputs_and_outputs`]
650    /// * [`Self::await_primary_module_output`]
651    /// * [`Self::get_balance`]
652    /// * [`Self::subscribe_balance_changes`]
653    fn supports_being_primary(&self) -> bool {
654        false
655    }
656
657    /// Creates all inputs and outputs necessary to balance the transaction.
658    /// The function returns an error if and only if the client's funds are not
659    /// sufficient to create the inputs necessary to fully fund the transaction.
660    ///
661    /// A returned input also contains:
662    /// * A set of private keys belonging to the input for signing the
663    ///   transaction
664    /// * A closure that generates states belonging to the input. This closure
665    ///   takes the transaction id of the transaction in which the input was
666    ///   used and the input index as input since these cannot be known at time
667    ///   of calling `create_funding_input` and have to be injected later.
668    ///
669    /// A returned output also contains:
670    /// * A closure that generates states belonging to the output. This closure
671    ///   takes the transaction id of the transaction in which the output was
672    ///   used and the output index as input since these cannot be known at time
673    ///   of calling `create_change_output` and have to be injected later.
674
675    async fn create_final_inputs_and_outputs(
676        &self,
677        _dbtx: &mut DatabaseTransaction<'_>,
678        _operation_id: OperationId,
679        _input_amount: Amount,
680        _output_amount: Amount,
681    ) -> anyhow::Result<(
682        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
683        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
684    )> {
685        unimplemented!()
686    }
687
688    /// Waits for the funds from an output created by
689    /// [`Self::create_final_inputs_and_outputs`] to become available. This
690    /// function returning typically implies a change in the output of
691    /// [`Self::get_balance`].
692    async fn await_primary_module_output(
693        &self,
694        _operation_id: OperationId,
695        _out_point: OutPoint,
696    ) -> anyhow::Result<()> {
697        unimplemented!()
698    }
699
700    /// Returns the balance held by this module and available for funding
701    /// transactions.
702    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
703        unimplemented!()
704    }
705
706    /// Returns a stream that will output the updated module balance each time
707    /// it changes.
708    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
709        unimplemented!()
710    }
711
712    /// Leave the federation
713    ///
714    /// While technically there's nothing stopping the client from just
715    /// abandoning Federation at any point by deleting all the related
716    /// local data, it is useful to make sure it's safe beforehand.
717    ///
718    /// This call indicates the desire of the caller client code
719    /// to orderly and safely leave the Federation by this module instance.
720    /// The goal of the implementations is to fulfil that wish,
721    /// giving prompt and informative feedback if it's not yet possible.
722    ///
723    /// The client module implementation should handle the request
724    /// and return as fast as possible avoiding blocking for longer than
725    /// necessary. This would usually involve some combination of:
726    ///
727    /// * recording the state of being in process of leaving the Federation to
728    ///   prevent initiating new conditions that could delay its completion;
729    /// * performing any fast to complete cleanup/exit logic;
730    /// * initiating any time-consuming logic (e.g. canceling outstanding
731    ///   contracts), as background jobs, tasks machines, etc.
732    /// * checking for any conditions indicating it might not be safe to leave
733    ///   at the moment.
734    ///
735    /// This function should return `Ok` only if from the perspective
736    /// of this module instance, it is safe to delete client data and
737    /// stop using it, with no further actions (like background jobs) required
738    /// to complete.
739    ///
740    /// This function should return an error if it's not currently possible
741    /// to safely (e.g. without loosing funds) leave the Federation.
742    /// It should avoid running indefinitely trying to complete any cleanup
743    /// actions necessary to reach a clean state, preferring spawning new
744    /// state machines and returning an informative error about cleanup
745    /// still in progress.
746    ///
747    /// If any internal task needs to complete, any user action is required,
748    /// or even external condition needs to be met this function
749    /// should return a `Err`.
750    ///
751    /// Notably modules should not disable interaction that might be necessary
752    /// for the user (possibly through other modules) to leave the Federation.
753    /// In particular a Mint module should retain ability to create new notes,
754    /// and LN module should retain ability to send funds out.
755    ///
756    /// Calling code must NOT assume that a module that once returned `Ok`,
757    /// will not return `Err` at later point. E.g. a Mint module might have
758    /// no outstanding balance at first, but other modules winding down
759    /// might "cash-out" to Ecash.
760    ///
761    /// Before leaving the Federation and deleting any state the calling code
762    /// must collect a full round of `Ok` from all the modules.
763    ///
764    /// Calling code should allow the user to override and ignore any
765    /// outstanding errors, after sufficient amount of warnings. Ideally,
766    /// this should be done on per-module basis, to avoid mistakes.
767    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
768        bail!("Unable to determine if safe to leave the federation: Not implemented")
769    }
770}
771
772/// Type-erased version of [`ClientModule`]
773#[apply(async_trait_maybe_send!)]
774pub trait IClientModule: Debug {
775    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
776
777    fn decoder(&self) -> Decoder;
778
779    fn context(&self, instance: ModuleInstanceId) -> DynContext;
780
781    async fn start(&self);
782
783    async fn handle_cli_command(&self, args: &[ffi::OsString])
784        -> anyhow::Result<serde_json::Value>;
785
786    async fn handle_rpc(
787        &self,
788        method: String,
789        request: serde_json::Value,
790    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
791
792    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
793
794    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
795
796    fn supports_backup(&self) -> bool;
797
798    async fn backup(&self, module_instance_id: ModuleInstanceId)
799        -> anyhow::Result<DynModuleBackup>;
800
801    fn supports_being_primary(&self) -> bool;
802
803    async fn create_final_inputs_and_outputs(
804        &self,
805        module_instance: ModuleInstanceId,
806        dbtx: &mut DatabaseTransaction<'_>,
807        operation_id: OperationId,
808        input_amount: Amount,
809        output_amount: Amount,
810    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
811
812    async fn await_primary_module_output(
813        &self,
814        operation_id: OperationId,
815        out_point: OutPoint,
816    ) -> anyhow::Result<()>;
817
818    async fn get_balance(
819        &self,
820        module_instance: ModuleInstanceId,
821        dbtx: &mut DatabaseTransaction<'_>,
822    ) -> Amount;
823
824    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
825}
826
827#[apply(async_trait_maybe_send!)]
828impl<T> IClientModule for T
829where
830    T: ClientModule,
831{
832    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
833        self
834    }
835
836    fn decoder(&self) -> Decoder {
837        T::decoder()
838    }
839
840    fn context(&self, instance: ModuleInstanceId) -> DynContext {
841        DynContext::from_typed(instance, <T as ClientModule>::context(self))
842    }
843
844    async fn start(&self) {
845        <T as ClientModule>::start(self).await;
846    }
847
848    async fn handle_cli_command(
849        &self,
850        args: &[ffi::OsString],
851    ) -> anyhow::Result<serde_json::Value> {
852        <T as ClientModule>::handle_cli_command(self, args).await
853    }
854
855    async fn handle_rpc(
856        &self,
857        method: String,
858        request: serde_json::Value,
859    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
860        <T as ClientModule>::handle_rpc(self, method, request).await
861    }
862
863    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
864        <T as ClientModule>::input_fee(
865            self,
866            amount,
867            input
868                .as_any()
869                .downcast_ref()
870                .expect("Dispatched to correct module"),
871        )
872    }
873
874    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
875        <T as ClientModule>::output_fee(
876            self,
877            amount,
878            output
879                .as_any()
880                .downcast_ref()
881                .expect("Dispatched to correct module"),
882        )
883    }
884
885    fn supports_backup(&self) -> bool {
886        <T as ClientModule>::supports_backup(self)
887    }
888
889    async fn backup(
890        &self,
891        module_instance_id: ModuleInstanceId,
892    ) -> anyhow::Result<DynModuleBackup> {
893        Ok(DynModuleBackup::from_typed(
894            module_instance_id,
895            <T as ClientModule>::backup(self).await?,
896        ))
897    }
898
899    fn supports_being_primary(&self) -> bool {
900        <T as ClientModule>::supports_being_primary(self)
901    }
902
903    async fn create_final_inputs_and_outputs(
904        &self,
905        module_instance: ModuleInstanceId,
906        dbtx: &mut DatabaseTransaction<'_>,
907        operation_id: OperationId,
908        input_amount: Amount,
909        output_amount: Amount,
910    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
911        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
912            self,
913            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
914            operation_id,
915            input_amount,
916            output_amount,
917        )
918        .await?;
919
920        let inputs = inputs.into_dyn(module_instance);
921
922        let outputs = outputs.into_dyn(module_instance);
923
924        Ok((inputs, outputs))
925    }
926
927    async fn await_primary_module_output(
928        &self,
929        operation_id: OperationId,
930        out_point: OutPoint,
931    ) -> anyhow::Result<()> {
932        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
933    }
934
935    async fn get_balance(
936        &self,
937        module_instance: ModuleInstanceId,
938        dbtx: &mut DatabaseTransaction<'_>,
939    ) -> Amount {
940        <T as ClientModule>::get_balance(
941            self,
942            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
943        )
944        .await
945    }
946
947    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
948        <T as ClientModule>::subscribe_balance_changes(self).await
949    }
950}
951
952dyn_newtype_define!(
953    #[derive(Clone)]
954    pub DynClientModule(Arc<IClientModule>)
955);
956
957impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
958    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
959        self.inner.as_ref()
960    }
961}
962
963/// A contiguous range of input/output indexes
964#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
965pub struct IdxRange {
966    start: u64,
967    end_inclusive: u64,
968}
969
970impl IdxRange {
971    pub fn new_single(start: u64) -> Self {
972        Self {
973            start,
974            end_inclusive: start,
975        }
976    }
977
978    pub fn start(self) -> u64 {
979        self.start
980    }
981
982    pub fn count(self) -> usize {
983        self.into_iter().count()
984    }
985}
986
987impl IntoIterator for IdxRange {
988    type Item = u64;
989
990    type IntoIter = ops::RangeInclusive<u64>;
991
992    fn into_iter(self) -> Self::IntoIter {
993        ops::RangeInclusive::new(self.start, self.end_inclusive)
994    }
995}
996
997impl From<ops::RangeInclusive<u64>> for IdxRange {
998    fn from(value: ops::RangeInclusive<u64>) -> Self {
999        Self {
1000            start: *value.start(),
1001            end_inclusive: *value.end(),
1002        }
1003    }
1004}
1005
1006#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
1007pub struct OutPointRange {
1008    txid: TransactionId,
1009    idx_range: IdxRange,
1010}
1011
1012impl OutPointRange {
1013    pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1014        Self { txid, idx_range }
1015    }
1016
1017    pub fn new_single(txid: TransactionId, idx: u64) -> Self {
1018        Self {
1019            txid,
1020            idx_range: IdxRange::new_single(idx),
1021        }
1022    }
1023
1024    pub fn start_idx(self) -> u64 {
1025        self.idx_range.start()
1026    }
1027
1028    pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1029        self.idx_range.into_iter()
1030    }
1031
1032    pub fn count(self) -> usize {
1033        self.idx_range.count()
1034    }
1035}
1036
1037impl IntoIterator for OutPointRange {
1038    type Item = OutPoint;
1039
1040    type IntoIter = OutPointRangeIter;
1041
1042    fn into_iter(self) -> Self::IntoIter {
1043        OutPointRangeIter {
1044            txid: self.txid,
1045            inner: self.idx_range.into_iter(),
1046        }
1047    }
1048}
1049
1050pub struct OutPointRangeIter {
1051    txid: TransactionId,
1052
1053    inner: ops::RangeInclusive<u64>,
1054}
1055
1056impl OutPointRange {
1057    pub fn txid(&self) -> TransactionId {
1058        self.txid
1059    }
1060}
1061
1062impl Iterator for OutPointRangeIter {
1063    type Item = OutPoint;
1064
1065    fn next(&mut self) -> Option<Self::Item> {
1066        self.inner.next().map(|idx| OutPoint {
1067            txid: self.txid,
1068            out_idx: idx,
1069        })
1070    }
1071}
1072
1073pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;