fedimint_client/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::sync::{Arc, Weak};
6use std::{ffi, marker, ops};
7
8use anyhow::{anyhow, bail};
9use bitcoin::secp256k1::PublicKey;
10use fedimint_api_client::api::DynGlobalApi;
11use fedimint_core::config::ClientConfig;
12use fedimint_core::core::{
13    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
14    OperationId,
15};
16use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
17use fedimint_core::encoding::{Decodable, Encodable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24    apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send_sync, Amount, OutPoint,
25    PeerId, TransactionId,
26};
27use fedimint_eventlog::{Event, EventKind};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{OperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, Executor, State};
38use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
39use crate::{
40    oplog, AddStateMachinesResult, Client, InstancelessDynClientInputBundle, TransactionUpdates,
41};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48/// A fedimint-client interface exposed to client modules
49///
50/// To break the dependency of the client modules on the whole fedimint client
51/// and in particular the `fedimint-client` crate, the module gets access to an
52/// interface, that is implemented by the `Client`.
53///
54/// This allows lose coupling, less recompilation and better control and
55/// understanding of what functionality of the Client the modules get access to.
56#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59    fn api_clone(&self) -> DynGlobalApi;
60    fn decoders(&self) -> &ModuleDecoderRegistry;
61    async fn finalize_and_submit_transaction(
62        &self,
63        operation_id: OperationId,
64        operation_type: &str,
65        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66        tx_builder: TransactionBuilder,
67    ) -> anyhow::Result<OutPointRange>;
68
69    // TODO: unify
70    async fn finalize_and_submit_transaction_inner(
71        &self,
72        dbtx: &mut DatabaseTransaction<'_>,
73        operation_id: OperationId,
74        tx_builder: TransactionBuilder,
75    ) -> anyhow::Result<OutPointRange>;
76
77    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79    async fn await_primary_module_outputs(
80        &self,
81        operation_id: OperationId,
82        // TODO: make `impl Iterator<Item = ...>`
83        outputs: Vec<OutPoint>,
84    ) -> anyhow::Result<()>;
85
86    fn operation_log(&self) -> &OperationLog;
87
88    async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90    async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92    async fn config(&self) -> ClientConfig;
93
94    fn db(&self) -> &Database;
95
96    fn executor(&self) -> &Executor;
97
98    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102    async fn log_event_json(
103        &self,
104        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105        module_kind: Option<ModuleKind>,
106        module_id: ModuleInstanceId,
107        kind: EventKind,
108        payload: serde_json::Value,
109        persist: bool,
110    );
111}
112
113/// A final, fully initialized [`crate::Client`]
114///
115/// Client modules need to be able to access a `Client` they are a part
116/// of. To break the circular dependency, the final `Client` is passed
117/// after `Client` was built via a shared state.
118#[derive(Clone, Default)]
119pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
120
121impl FinalClientIface {
122    /// Get a temporary strong reference to [`ClientContextIface`]
123    ///
124    /// Care must be taken to not let the user take ownership of this value,
125    /// and not store it elsewhere permanently either, as it could prevent
126    /// the cleanup of the Client.
127    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
128        self.0
129            .get()
130            .expect("client must be already set")
131            .upgrade()
132            .expect("client module context must not be use past client shutdown")
133    }
134
135    pub(crate) fn set(&self, client: Weak<dyn ClientContextIface>) {
136        self.0.set(client).expect("FinalLazyClient already set");
137    }
138}
139
140/// A Client context for a [`ClientModule`] `M`
141///
142/// Client modules can interact with the whole
143/// client through this struct.
144pub struct ClientContext<M> {
145    client: FinalClientIface,
146    module_instance_id: ModuleInstanceId,
147    global_dbtx_access_token: GlobalDBTxAccessToken,
148    module_db: Database,
149    _marker: marker::PhantomData<M>,
150}
151
152impl<M> Clone for ClientContext<M> {
153    fn clone(&self) -> Self {
154        Self {
155            client: self.client.clone(),
156            module_db: self.module_db.clone(),
157            module_instance_id: self.module_instance_id,
158            _marker: marker::PhantomData,
159            global_dbtx_access_token: self.global_dbtx_access_token,
160        }
161    }
162}
163
164/// A reference back to itself that the module cacn get from the
165/// [`ClientContext`]
166pub struct ClientContextSelfRef<'s, M> {
167    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
168    // stored permanently somewhere
169    client: Arc<dyn ClientContextIface>,
170    module_instance_id: ModuleInstanceId,
171    _marker: marker::PhantomData<&'s M>,
172}
173
174impl<M> ops::Deref for ClientContextSelfRef<'_, M>
175where
176    M: ClientModule,
177{
178    type Target = M;
179
180    fn deref(&self) -> &Self::Target {
181        self.client
182            .get_module(self.module_instance_id)
183            .as_any()
184            .downcast_ref::<M>()
185            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
186    }
187}
188
189impl<M> fmt::Debug for ClientContext<M> {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        f.write_str("ClientContext")
192    }
193}
194
195impl<M> ClientContext<M>
196where
197    M: ClientModule,
198{
199    /// Get a reference back to client module from the [`Self`]
200    ///
201    /// It's often necessary for a client module to "move self"
202    /// by-value, especially due to async lifetimes issues.
203    /// Clients usually work with `&mut self`, which can't really
204    /// work in such context.
205    ///
206    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
207    /// can be used to recover the reference to the module at later
208    /// time.
209    #[allow(clippy::needless_lifetimes)] // just for explicitiness
210    pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
211        ClientContextSelfRef {
212            client: self.client.get(),
213            module_instance_id: self.module_instance_id,
214            _marker: marker::PhantomData,
215        }
216    }
217
218    /// Get a reference to a global Api handle
219    pub fn global_api(&self) -> DynGlobalApi {
220        self.client.get().api_clone()
221    }
222
223    /// A set of all decoders of all modules of the client
224    pub fn decoders(&self) -> ModuleDecoderRegistry {
225        Clone::clone(self.client.get().decoders())
226    }
227
228    pub fn input_from_dyn<'i>(
229        &self,
230        input: &'i DynInput,
231    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
232        (input.module_instance_id() == self.module_instance_id).then(|| {
233            input
234                .as_any()
235                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
236                .unwrap_or_else(|| {
237                    panic!("instance_id {} just checked", input.module_instance_id())
238                })
239        })
240    }
241
242    pub fn output_from_dyn<'o>(
243        &self,
244        output: &'o DynOutput,
245    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
246        (output.module_instance_id() == self.module_instance_id).then(|| {
247            output
248                .as_any()
249                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
250                .unwrap_or_else(|| {
251                    panic!("instance_id {} just checked", output.module_instance_id())
252                })
253        })
254    }
255
256    pub fn map_dyn<'s, 'i, 'o, I>(
257        &'s self,
258        typed: impl IntoIterator<Item = I> + 'i,
259    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
260    where
261        I: IntoDynInstance,
262        'i: 'o,
263        's: 'o,
264    {
265        typed.into_iter().map(|i| self.make_dyn(i))
266    }
267
268    /// Turn a typed output into a dyn version
269    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
270        self.make_dyn(output)
271    }
272
273    /// Turn a typed input into a dyn version
274    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
275        self.make_dyn(input)
276    }
277
278    /// Turn a `typed` into a dyn version
279    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
280    where
281        I: IntoDynInstance,
282    {
283        typed.into_dyn(self.module_instance_id)
284    }
285
286    /// Turn a typed [`ClientOutputBundle`] into a dyn version
287    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
288    where
289        O: IntoDynInstance<DynType = DynOutput> + 'static,
290        S: IntoDynInstance<DynType = DynState> + 'static,
291    {
292        self.make_dyn(output)
293    }
294
295    /// Turn a typed [`ClientInputBundle`] into a dyn version
296    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
297    where
298        I: IntoDynInstance<DynType = DynInput> + 'static,
299        S: IntoDynInstance<DynType = DynState> + 'static,
300    {
301        self.make_dyn(inputs)
302    }
303
304    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
305    where
306        S: sm::IState + 'static,
307    {
308        DynState::from_typed(self.module_instance_id, sm)
309    }
310
311    /// See [`crate::Client::finalize_and_submit_transaction`]
312    pub async fn finalize_and_submit_transaction<F, Meta>(
313        &self,
314        operation_id: OperationId,
315        operation_type: &str,
316        operation_meta_gen: F,
317        tx_builder: TransactionBuilder,
318    ) -> anyhow::Result<OutPointRange>
319    where
320        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
321        Meta: serde::Serialize + MaybeSend,
322    {
323        self.client
324            .get()
325            .finalize_and_submit_transaction(
326                operation_id,
327                operation_type,
328                Box::new(move |out_point_range| {
329                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
330                }),
331                tx_builder,
332            )
333            .await
334    }
335
336    /// See [`crate::Client::transaction_updates`]
337    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
338        self.client.get().transaction_updates(operation_id).await
339    }
340
341    /// See [`crate::Client::await_primary_module_outputs`]
342    pub async fn await_primary_module_outputs(
343        &self,
344        operation_id: OperationId,
345        // TODO: make `impl Iterator<Item = ...>`
346        outputs: Vec<OutPoint>,
347    ) -> anyhow::Result<()> {
348        self.client
349            .get()
350            .await_primary_module_outputs(operation_id, outputs)
351            .await
352    }
353
354    // TODO: unify with `Self::get_operation`
355    pub async fn get_operation(
356        &self,
357        operation_id: OperationId,
358    ) -> anyhow::Result<oplog::OperationLogEntry> {
359        let operation = self
360            .client
361            .get()
362            .operation_log()
363            .get_operation(operation_id)
364            .await
365            .ok_or(anyhow::anyhow!("Operation not found"))?;
366
367        if operation.operation_module_kind() != M::kind().as_str() {
368            bail!("Operation is not a lightning operation");
369        }
370
371        Ok(operation)
372    }
373
374    /// Get global db.
375    ///
376    /// Only intended for internal use (private).
377    fn global_db(&self) -> fedimint_core::db::Database {
378        let db = Clone::clone(self.client.get().db());
379
380        db.ensure_global()
381            .expect("global_db must always return a global db");
382
383        db
384    }
385
386    pub fn module_db(&self) -> &Database {
387        self.module_db
388            .ensure_isolated()
389            .expect("module_db must always return isolated db");
390        &self.module_db
391    }
392
393    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
394        self.client.get().has_active_states(op_id).await
395    }
396
397    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
398        self.client.get().operation_exists(op_id).await
399    }
400
401    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
402        self.client
403            .get()
404            .executor()
405            .get_active_states()
406            .await
407            .into_iter()
408            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
409            .map(|s| {
410                (
411                    Clone::clone(
412                        s.0.as_any()
413                            .downcast_ref::<M::States>()
414                            .expect("incorrect output type passed to module plugin"),
415                    ),
416                    s.1,
417                )
418            })
419            .collect()
420    }
421
422    pub async fn get_config(&self) -> ClientConfig {
423        self.client.get().config().await
424    }
425
426    /// Returns an invite code for the federation that points to an arbitrary
427    /// guardian server for fetching the config
428    pub async fn get_invite_code(&self) -> InviteCode {
429        let cfg = self.get_config().await.global;
430        self.client
431            .get()
432            .invite_code(
433                *cfg.api_endpoints
434                    .keys()
435                    .next()
436                    .expect("A federation always has at least one guardian"),
437            )
438            .await
439            .expect("The guardian we requested an invite code for exists")
440    }
441
442    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
443        self.client.get().get_internal_payment_markers()
444    }
445
446    /// This method starts n state machines with given operation id without a
447    /// corresponding transaction
448    pub async fn manual_operation_start(
449        &self,
450        operation_id: OperationId,
451        op_type: &str,
452        operation_meta: impl serde::Serialize + Debug,
453        sms: Vec<DynState>,
454    ) -> anyhow::Result<()> {
455        let db = self.module_db();
456        let mut dbtx = db.begin_transaction().await;
457        {
458            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
459
460            self.manual_operation_start_inner(
461                &mut dbtx.to_ref_nc(),
462                operation_id,
463                op_type,
464                operation_meta,
465                sms,
466            )
467            .await?;
468        }
469
470        dbtx.commit_tx_result().await.map_err(|_| {
471            anyhow!(
472                "Operation with id {} already exists",
473                operation_id.fmt_short()
474            )
475        })?;
476
477        Ok(())
478    }
479
480    pub async fn manual_operation_start_dbtx(
481        &self,
482        dbtx: &mut DatabaseTransaction<'_>,
483        operation_id: OperationId,
484        op_type: &str,
485        operation_meta: impl serde::Serialize + Debug,
486        sms: Vec<DynState>,
487    ) -> anyhow::Result<()> {
488        self.manual_operation_start_inner(
489            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
490            operation_id,
491            op_type,
492            operation_meta,
493            sms,
494        )
495        .await
496    }
497
498    /// See [`Self::manual_operation_start`], just inside a database
499    /// transaction.
500    async fn manual_operation_start_inner(
501        &self,
502        dbtx: &mut DatabaseTransaction<'_>,
503        operation_id: OperationId,
504        op_type: &str,
505        operation_meta: impl serde::Serialize + Debug,
506        sms: Vec<DynState>,
507    ) -> anyhow::Result<()> {
508        dbtx.ensure_global()
509            .expect("Must deal with global dbtx here");
510
511        if Client::operation_exists_dbtx(&mut dbtx.to_ref_nc(), operation_id).await {
512            bail!(
513                "Operation with id {} already exists",
514                operation_id.fmt_short()
515            );
516        }
517
518        self.client
519            .get()
520            .operation_log()
521            .add_operation_log_entry(&mut dbtx.to_ref_nc(), operation_id, op_type, operation_meta)
522            .await;
523
524        self.client
525            .get()
526            .executor()
527            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
528            .await
529            .expect("State machine is valid");
530
531        Ok(())
532    }
533
534    pub fn outcome_or_updates<U, S>(
535        &self,
536        operation: &OperationLogEntry,
537        operation_id: OperationId,
538        stream_gen: impl FnOnce() -> S,
539    ) -> UpdateStreamOrOutcome<U>
540    where
541        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
542        S: Stream<Item = U> + MaybeSend + 'static,
543    {
544        operation.outcome_or_updates(&self.global_db(), operation_id, stream_gen)
545    }
546
547    pub async fn claim_inputs<I, S>(
548        &self,
549        dbtx: &mut DatabaseTransaction<'_>,
550        inputs: ClientInputBundle<I, S>,
551        operation_id: OperationId,
552    ) -> anyhow::Result<OutPointRange>
553    where
554        I: IInput + MaybeSend + MaybeSync + 'static,
555        S: sm::IState + MaybeSend + MaybeSync + 'static,
556    {
557        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
558            .await
559    }
560
561    async fn claim_inputs_dyn(
562        &self,
563        dbtx: &mut DatabaseTransaction<'_>,
564        inputs: InstancelessDynClientInputBundle,
565        operation_id: OperationId,
566    ) -> anyhow::Result<OutPointRange> {
567        let tx_builder =
568            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
569
570        self.client
571            .get()
572            .finalize_and_submit_transaction_inner(
573                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
574                operation_id,
575                tx_builder,
576            )
577            .await
578    }
579
580    pub async fn add_state_machines_dbtx(
581        &self,
582        dbtx: &mut DatabaseTransaction<'_>,
583        states: Vec<DynState>,
584    ) -> AddStateMachinesResult {
585        self.client
586            .get()
587            .executor()
588            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
589            .await
590    }
591
592    pub async fn add_operation_log_entry_dbtx(
593        &self,
594        dbtx: &mut DatabaseTransaction<'_>,
595        operation_id: OperationId,
596        operation_type: &str,
597        operation_meta: impl serde::Serialize,
598    ) {
599        self.client
600            .get()
601            .operation_log()
602            .add_operation_log_entry(
603                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
604                operation_id,
605                operation_type,
606                operation_meta,
607            )
608            .await;
609    }
610
611    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
612    where
613        E: Event + Send,
614        Cap: Send,
615    {
616        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
617            warn!(
618                target: LOG_CLIENT,
619                module_kind = %<M as ClientModule>::kind(),
620                event_module = ?<E as Event>::MODULE,
621                "Client module logging events of different module than its own. This might become an error in the future."
622            );
623        }
624        self.client
625            .get()
626            .log_event_json(
627                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
628                <E as Event>::MODULE,
629                self.module_instance_id,
630                <E as Event>::KIND,
631                serde_json::to_value(event).expect("Can't fail"),
632                <E as Event>::PERSIST,
633            )
634            .await;
635    }
636}
637
638/// Fedimint module client
639#[apply(async_trait_maybe_send!)]
640pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
641    type Init: ClientModuleInit;
642
643    /// Common module types shared between client and server
644    type Common: ModuleCommon;
645
646    /// Data stored in regular backups so that restoring doesn't have to start
647    /// from epoch 0
648    type Backup: ModuleBackup;
649
650    /// Data and API clients available to state machine transitions of this
651    /// module
652    type ModuleStateMachineContext: Context;
653
654    /// All possible states this client can submit to the executor
655    type States: State<ModuleContext = Self::ModuleStateMachineContext>
656        + IntoDynInstance<DynType = DynState>;
657
658    fn decoder() -> Decoder {
659        let mut decoder_builder = Self::Common::decoder_builder();
660        decoder_builder.with_decodable_type::<Self::States>();
661        decoder_builder.with_decodable_type::<Self::Backup>();
662        decoder_builder.build()
663    }
664
665    fn kind() -> ModuleKind {
666        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
667    }
668
669    fn context(&self) -> Self::ModuleStateMachineContext;
670
671    /// Initialize client.
672    ///
673    /// Called by the core client code on start, after [`ClientContext`] is
674    /// fully initialized, so unlike during [`ClientModuleInit::init`],
675    /// access to global client is allowed.
676    async fn start(&self) {}
677
678    async fn handle_cli_command(
679        &self,
680        _args: &[ffi::OsString],
681    ) -> anyhow::Result<serde_json::Value> {
682        Err(anyhow::format_err!(
683            "This module does not implement cli commands"
684        ))
685    }
686
687    async fn handle_rpc(
688        &self,
689        _method: String,
690        _request: serde_json::Value,
691    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
692        Box::pin(futures::stream::once(std::future::ready(Err(
693            anyhow::format_err!("This module does not implement rpc"),
694        ))))
695    }
696
697    /// Returns the fee the processing of this input requires.
698    ///
699    /// If the semantics of a given input aren't known this function returns
700    /// `None`, this only happens if a future version of Fedimint introduces a
701    /// new input variant. For clients this should only be the case when
702    /// processing transactions created by other users, so the result of
703    /// this function can be `unwrap`ped whenever dealing with inputs
704    /// generated by ourselves.
705    fn input_fee(
706        &self,
707        amount: Amount,
708        input: &<Self::Common as ModuleCommon>::Input,
709    ) -> Option<Amount>;
710
711    /// Returns the fee the processing of this output requires.
712    ///
713    /// If the semantics of a given output aren't known this function returns
714    /// `None`, this only happens if a future version of Fedimint introduces a
715    /// new output variant. For clients this should only be the case when
716    /// processing transactions created by other users, so the result of
717    /// this function can be `unwrap`ped whenever dealing with inputs
718    /// generated by ourselves.
719    fn output_fee(
720        &self,
721        amount: Amount,
722        output: &<Self::Common as ModuleCommon>::Output,
723    ) -> Option<Amount>;
724
725    fn supports_backup(&self) -> bool {
726        false
727    }
728
729    async fn backup(&self) -> anyhow::Result<Self::Backup> {
730        anyhow::bail!("Backup not supported");
731    }
732
733    /// Does this module support being a primary module
734    ///
735    /// If it does it must implement:
736    ///
737    /// * [`Self::create_final_inputs_and_outputs`]
738    /// * [`Self::await_primary_module_output`]
739    /// * [`Self::get_balance`]
740    /// * [`Self::subscribe_balance_changes`]
741    fn supports_being_primary(&self) -> bool {
742        false
743    }
744
745    /// Creates all inputs and outputs necessary to balance the transaction.
746    /// The function returns an error if and only if the client's funds are not
747    /// sufficient to create the inputs necessary to fully fund the transaction.
748    ///
749    /// A returned input also contains:
750    /// * A set of private keys belonging to the input for signing the
751    ///   transaction
752    /// * A closure that generates states belonging to the input. This closure
753    ///   takes the transaction id of the transaction in which the input was
754    ///   used and the input index as input since these cannot be known at time
755    ///   of calling `create_funding_input` and have to be injected later.
756    ///
757    /// A returned output also contains:
758    /// * A closure that generates states belonging to the output. This closure
759    ///   takes the transaction id of the transaction in which the output was
760    ///   used and the output index as input since these cannot be known at time
761    ///   of calling `create_change_output` and have to be injected later.
762
763    async fn create_final_inputs_and_outputs(
764        &self,
765        _dbtx: &mut DatabaseTransaction<'_>,
766        _operation_id: OperationId,
767        _input_amount: Amount,
768        _output_amount: Amount,
769    ) -> anyhow::Result<(
770        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
771        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
772    )> {
773        unimplemented!()
774    }
775
776    /// Waits for the funds from an output created by
777    /// [`Self::create_final_inputs_and_outputs`] to become available. This
778    /// function returning typically implies a change in the output of
779    /// [`Self::get_balance`].
780    async fn await_primary_module_output(
781        &self,
782        _operation_id: OperationId,
783        _out_point: OutPoint,
784    ) -> anyhow::Result<()> {
785        unimplemented!()
786    }
787
788    /// Returns the balance held by this module and available for funding
789    /// transactions.
790    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
791        unimplemented!()
792    }
793
794    /// Returns a stream that will output the updated module balance each time
795    /// it changes.
796    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
797        unimplemented!()
798    }
799
800    /// Leave the federation
801    ///
802    /// While technically there's nothing stopping the client from just
803    /// abandoning Federation at any point by deleting all the related
804    /// local data, it is useful to make sure it's safe beforehand.
805    ///
806    /// This call indicates the desire of the caller client code
807    /// to orderly and safely leave the Federation by this module instance.
808    /// The goal of the implementations is to fulfil that wish,
809    /// giving prompt and informative feedback if it's not yet possible.
810    ///
811    /// The client module implementation should handle the request
812    /// and return as fast as possible avoiding blocking for longer than
813    /// necessary. This would usually involve some combination of:
814    ///
815    /// * recording the state of being in process of leaving the Federation to
816    ///   prevent initiating new conditions that could delay its completion;
817    /// * performing any fast to complete cleanup/exit logic;
818    /// * initiating any time-consuming logic (e.g. canceling outstanding
819    ///   contracts), as background jobs, tasks machines, etc.
820    /// * checking for any conditions indicating it might not be safe to leave
821    ///   at the moment.
822    ///
823    /// This function should return `Ok` only if from the perspective
824    /// of this module instance, it is safe to delete client data and
825    /// stop using it, with no further actions (like background jobs) required
826    /// to complete.
827    ///
828    /// This function should return an error if it's not currently possible
829    /// to safely (e.g. without loosing funds) leave the Federation.
830    /// It should avoid running indefinitely trying to complete any cleanup
831    /// actions necessary to reach a clean state, preferring spawning new
832    /// state machines and returning an informative error about cleanup
833    /// still in progress.
834    ///
835    /// If any internal task needs to complete, any user action is required,
836    /// or even external condition needs to be met this function
837    /// should return a `Err`.
838    ///
839    /// Notably modules should not disable interaction that might be necessary
840    /// for the user (possibly through other modules) to leave the Federation.
841    /// In particular a Mint module should retain ability to create new notes,
842    /// and LN module should retain ability to send funds out.
843    ///
844    /// Calling code must NOT assume that a module that once returned `Ok`,
845    /// will not return `Err` at later point. E.g. a Mint module might have
846    /// no outstanding balance at first, but other modules winding down
847    /// might "cash-out" to Ecash.
848    ///
849    /// Before leaving the Federation and deleting any state the calling code
850    /// must collect a full round of `Ok` from all the modules.
851    ///
852    /// Calling code should allow the user to override and ignore any
853    /// outstanding errors, after sufficient amount of warnings. Ideally,
854    /// this should be done on per-module basis, to avoid mistakes.
855    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
856        bail!("Unable to determine if safe to leave the federation: Not implemented")
857    }
858}
859
860/// Type-erased version of [`ClientModule`]
861#[apply(async_trait_maybe_send!)]
862pub trait IClientModule: Debug {
863    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
864
865    fn decoder(&self) -> Decoder;
866
867    fn context(&self, instance: ModuleInstanceId) -> DynContext;
868
869    async fn start(&self);
870
871    async fn handle_cli_command(&self, args: &[ffi::OsString])
872        -> anyhow::Result<serde_json::Value>;
873
874    async fn handle_rpc(
875        &self,
876        method: String,
877        request: serde_json::Value,
878    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
879
880    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
881
882    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
883
884    fn supports_backup(&self) -> bool;
885
886    async fn backup(&self, module_instance_id: ModuleInstanceId)
887        -> anyhow::Result<DynModuleBackup>;
888
889    fn supports_being_primary(&self) -> bool;
890
891    async fn create_final_inputs_and_outputs(
892        &self,
893        module_instance: ModuleInstanceId,
894        dbtx: &mut DatabaseTransaction<'_>,
895        operation_id: OperationId,
896        input_amount: Amount,
897        output_amount: Amount,
898    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
899
900    async fn await_primary_module_output(
901        &self,
902        operation_id: OperationId,
903        out_point: OutPoint,
904    ) -> anyhow::Result<()>;
905
906    async fn get_balance(
907        &self,
908        module_instance: ModuleInstanceId,
909        dbtx: &mut DatabaseTransaction<'_>,
910    ) -> Amount;
911
912    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
913}
914
915#[apply(async_trait_maybe_send!)]
916impl<T> IClientModule for T
917where
918    T: ClientModule,
919{
920    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
921        self
922    }
923
924    fn decoder(&self) -> Decoder {
925        T::decoder()
926    }
927
928    fn context(&self, instance: ModuleInstanceId) -> DynContext {
929        DynContext::from_typed(instance, <T as ClientModule>::context(self))
930    }
931
932    async fn start(&self) {
933        <T as ClientModule>::start(self).await;
934    }
935
936    async fn handle_cli_command(
937        &self,
938        args: &[ffi::OsString],
939    ) -> anyhow::Result<serde_json::Value> {
940        <T as ClientModule>::handle_cli_command(self, args).await
941    }
942
943    async fn handle_rpc(
944        &self,
945        method: String,
946        request: serde_json::Value,
947    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
948        <T as ClientModule>::handle_rpc(self, method, request).await
949    }
950
951    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
952        <T as ClientModule>::input_fee(
953            self,
954            amount,
955            input
956                .as_any()
957                .downcast_ref()
958                .expect("Dispatched to correct module"),
959        )
960    }
961
962    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
963        <T as ClientModule>::output_fee(
964            self,
965            amount,
966            output
967                .as_any()
968                .downcast_ref()
969                .expect("Dispatched to correct module"),
970        )
971    }
972
973    fn supports_backup(&self) -> bool {
974        <T as ClientModule>::supports_backup(self)
975    }
976
977    async fn backup(
978        &self,
979        module_instance_id: ModuleInstanceId,
980    ) -> anyhow::Result<DynModuleBackup> {
981        Ok(DynModuleBackup::from_typed(
982            module_instance_id,
983            <T as ClientModule>::backup(self).await?,
984        ))
985    }
986
987    fn supports_being_primary(&self) -> bool {
988        <T as ClientModule>::supports_being_primary(self)
989    }
990
991    async fn create_final_inputs_and_outputs(
992        &self,
993        module_instance: ModuleInstanceId,
994        dbtx: &mut DatabaseTransaction<'_>,
995        operation_id: OperationId,
996        input_amount: Amount,
997        output_amount: Amount,
998    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
999        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1000            self,
1001            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1002            operation_id,
1003            input_amount,
1004            output_amount,
1005        )
1006        .await?;
1007
1008        let inputs = inputs.into_dyn(module_instance);
1009
1010        let outputs = outputs.into_dyn(module_instance);
1011
1012        Ok((inputs, outputs))
1013    }
1014
1015    async fn await_primary_module_output(
1016        &self,
1017        operation_id: OperationId,
1018        out_point: OutPoint,
1019    ) -> anyhow::Result<()> {
1020        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1021    }
1022
1023    async fn get_balance(
1024        &self,
1025        module_instance: ModuleInstanceId,
1026        dbtx: &mut DatabaseTransaction<'_>,
1027    ) -> Amount {
1028        <T as ClientModule>::get_balance(
1029            self,
1030            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1031        )
1032        .await
1033    }
1034
1035    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1036        <T as ClientModule>::subscribe_balance_changes(self).await
1037    }
1038}
1039
1040dyn_newtype_define!(
1041    #[derive(Clone)]
1042    pub DynClientModule(Arc<IClientModule>)
1043);
1044
1045impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1046    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1047        self.inner.as_ref()
1048    }
1049}
1050
1051/// A contiguous range of input/output indexes
1052#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
1053pub struct IdxRange {
1054    start: u64,
1055    end: u64,
1056}
1057
1058impl IdxRange {
1059    pub fn new_single(start: u64) -> Option<Self> {
1060        start.checked_add(1).map(|end| Self { start, end })
1061    }
1062
1063    pub fn start(self) -> u64 {
1064        self.start
1065    }
1066
1067    pub fn count(self) -> usize {
1068        self.into_iter().count()
1069    }
1070
1071    pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1072        range.end().checked_add(1).map(|end| Self {
1073            start: *range.start(),
1074            end,
1075        })
1076    }
1077}
1078
1079impl From<Range<u64>> for IdxRange {
1080    fn from(Range { start, end }: Range<u64>) -> Self {
1081        Self { start, end }
1082    }
1083}
1084
1085impl IntoIterator for IdxRange {
1086    type Item = u64;
1087
1088    type IntoIter = ops::Range<u64>;
1089
1090    fn into_iter(self) -> Self::IntoIter {
1091        ops::Range {
1092            start: self.start,
1093            end: self.end,
1094        }
1095    }
1096}
1097
1098#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
1099pub struct OutPointRange {
1100    pub txid: TransactionId,
1101    idx_range: IdxRange,
1102}
1103
1104impl OutPointRange {
1105    pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1106        Self { txid, idx_range }
1107    }
1108
1109    pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1110        IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1111    }
1112
1113    pub fn start_idx(self) -> u64 {
1114        self.idx_range.start()
1115    }
1116
1117    pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1118        self.idx_range.into_iter()
1119    }
1120
1121    pub fn count(self) -> usize {
1122        self.idx_range.count()
1123    }
1124}
1125
1126impl IntoIterator for OutPointRange {
1127    type Item = OutPoint;
1128
1129    type IntoIter = OutPointRangeIter;
1130
1131    fn into_iter(self) -> Self::IntoIter {
1132        OutPointRangeIter {
1133            txid: self.txid,
1134            inner: self.idx_range.into_iter(),
1135        }
1136    }
1137}
1138
1139pub struct OutPointRangeIter {
1140    txid: TransactionId,
1141
1142    inner: ops::Range<u64>,
1143}
1144
1145impl OutPointRange {
1146    pub fn txid(&self) -> TransactionId {
1147        self.txid
1148    }
1149}
1150
1151impl Iterator for OutPointRangeIter {
1152    type Item = OutPoint;
1153
1154    fn next(&mut self) -> Option<Self::Item> {
1155        self.inner.next().map(|idx| OutPoint {
1156            txid: self.txid,
1157            out_idx: idx,
1158        })
1159    }
1160}
1161
1162pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;