1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::sync::{Arc, Weak};
6use std::{ffi, marker, ops};
7
8use anyhow::{anyhow, bail};
9use bitcoin::secp256k1::PublicKey;
10use fedimint_api_client::api::DynGlobalApi;
11use fedimint_core::config::ClientConfig;
12use fedimint_core::core::{
13 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
14 OperationId,
15};
16use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
17use fedimint_core::encoding::{Decodable, Encodable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24 apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send_sync, Amount, OutPoint,
25 PeerId, TransactionId,
26};
27use fedimint_eventlog::{Event, EventKind};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{OperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, Executor, State};
38use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
39use crate::{
40 oplog, AddStateMachinesResult, Client, InstancelessDynClientInputBundle, TransactionUpdates,
41};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59 fn api_clone(&self) -> DynGlobalApi;
60 fn decoders(&self) -> &ModuleDecoderRegistry;
61 async fn finalize_and_submit_transaction(
62 &self,
63 operation_id: OperationId,
64 operation_type: &str,
65 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66 tx_builder: TransactionBuilder,
67 ) -> anyhow::Result<OutPointRange>;
68
69 async fn finalize_and_submit_transaction_inner(
71 &self,
72 dbtx: &mut DatabaseTransaction<'_>,
73 operation_id: OperationId,
74 tx_builder: TransactionBuilder,
75 ) -> anyhow::Result<OutPointRange>;
76
77 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79 async fn await_primary_module_outputs(
80 &self,
81 operation_id: OperationId,
82 outputs: Vec<OutPoint>,
84 ) -> anyhow::Result<()>;
85
86 fn operation_log(&self) -> &OperationLog;
87
88 async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90 async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92 async fn config(&self) -> ClientConfig;
93
94 fn db(&self) -> &Database;
95
96 fn executor(&self) -> &Executor;
97
98 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102 async fn log_event_json(
103 &self,
104 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105 module_kind: Option<ModuleKind>,
106 module_id: ModuleInstanceId,
107 kind: EventKind,
108 payload: serde_json::Value,
109 persist: bool,
110 );
111}
112
113#[derive(Clone, Default)]
119pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
120
121impl FinalClientIface {
122 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
128 self.0
129 .get()
130 .expect("client must be already set")
131 .upgrade()
132 .expect("client module context must not be use past client shutdown")
133 }
134
135 pub(crate) fn set(&self, client: Weak<dyn ClientContextIface>) {
136 self.0.set(client).expect("FinalLazyClient already set");
137 }
138}
139
140pub struct ClientContext<M> {
145 client: FinalClientIface,
146 module_instance_id: ModuleInstanceId,
147 global_dbtx_access_token: GlobalDBTxAccessToken,
148 module_db: Database,
149 _marker: marker::PhantomData<M>,
150}
151
152impl<M> Clone for ClientContext<M> {
153 fn clone(&self) -> Self {
154 Self {
155 client: self.client.clone(),
156 module_db: self.module_db.clone(),
157 module_instance_id: self.module_instance_id,
158 _marker: marker::PhantomData,
159 global_dbtx_access_token: self.global_dbtx_access_token,
160 }
161 }
162}
163
164pub struct ClientContextSelfRef<'s, M> {
167 client: Arc<dyn ClientContextIface>,
170 module_instance_id: ModuleInstanceId,
171 _marker: marker::PhantomData<&'s M>,
172}
173
174impl<M> ops::Deref for ClientContextSelfRef<'_, M>
175where
176 M: ClientModule,
177{
178 type Target = M;
179
180 fn deref(&self) -> &Self::Target {
181 self.client
182 .get_module(self.module_instance_id)
183 .as_any()
184 .downcast_ref::<M>()
185 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
186 }
187}
188
189impl<M> fmt::Debug for ClientContext<M> {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 f.write_str("ClientContext")
192 }
193}
194
195impl<M> ClientContext<M>
196where
197 M: ClientModule,
198{
199 #[allow(clippy::needless_lifetimes)] pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
211 ClientContextSelfRef {
212 client: self.client.get(),
213 module_instance_id: self.module_instance_id,
214 _marker: marker::PhantomData,
215 }
216 }
217
218 pub fn global_api(&self) -> DynGlobalApi {
220 self.client.get().api_clone()
221 }
222
223 pub fn decoders(&self) -> ModuleDecoderRegistry {
225 Clone::clone(self.client.get().decoders())
226 }
227
228 pub fn input_from_dyn<'i>(
229 &self,
230 input: &'i DynInput,
231 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
232 (input.module_instance_id() == self.module_instance_id).then(|| {
233 input
234 .as_any()
235 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
236 .unwrap_or_else(|| {
237 panic!("instance_id {} just checked", input.module_instance_id())
238 })
239 })
240 }
241
242 pub fn output_from_dyn<'o>(
243 &self,
244 output: &'o DynOutput,
245 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
246 (output.module_instance_id() == self.module_instance_id).then(|| {
247 output
248 .as_any()
249 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
250 .unwrap_or_else(|| {
251 panic!("instance_id {} just checked", output.module_instance_id())
252 })
253 })
254 }
255
256 pub fn map_dyn<'s, 'i, 'o, I>(
257 &'s self,
258 typed: impl IntoIterator<Item = I> + 'i,
259 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
260 where
261 I: IntoDynInstance,
262 'i: 'o,
263 's: 'o,
264 {
265 typed.into_iter().map(|i| self.make_dyn(i))
266 }
267
268 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
270 self.make_dyn(output)
271 }
272
273 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
275 self.make_dyn(input)
276 }
277
278 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
280 where
281 I: IntoDynInstance,
282 {
283 typed.into_dyn(self.module_instance_id)
284 }
285
286 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
288 where
289 O: IntoDynInstance<DynType = DynOutput> + 'static,
290 S: IntoDynInstance<DynType = DynState> + 'static,
291 {
292 self.make_dyn(output)
293 }
294
295 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
297 where
298 I: IntoDynInstance<DynType = DynInput> + 'static,
299 S: IntoDynInstance<DynType = DynState> + 'static,
300 {
301 self.make_dyn(inputs)
302 }
303
304 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
305 where
306 S: sm::IState + 'static,
307 {
308 DynState::from_typed(self.module_instance_id, sm)
309 }
310
311 pub async fn finalize_and_submit_transaction<F, Meta>(
313 &self,
314 operation_id: OperationId,
315 operation_type: &str,
316 operation_meta_gen: F,
317 tx_builder: TransactionBuilder,
318 ) -> anyhow::Result<OutPointRange>
319 where
320 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
321 Meta: serde::Serialize + MaybeSend,
322 {
323 self.client
324 .get()
325 .finalize_and_submit_transaction(
326 operation_id,
327 operation_type,
328 Box::new(move |out_point_range| {
329 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
330 }),
331 tx_builder,
332 )
333 .await
334 }
335
336 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
338 self.client.get().transaction_updates(operation_id).await
339 }
340
341 pub async fn await_primary_module_outputs(
343 &self,
344 operation_id: OperationId,
345 outputs: Vec<OutPoint>,
347 ) -> anyhow::Result<()> {
348 self.client
349 .get()
350 .await_primary_module_outputs(operation_id, outputs)
351 .await
352 }
353
354 pub async fn get_operation(
356 &self,
357 operation_id: OperationId,
358 ) -> anyhow::Result<oplog::OperationLogEntry> {
359 let operation = self
360 .client
361 .get()
362 .operation_log()
363 .get_operation(operation_id)
364 .await
365 .ok_or(anyhow::anyhow!("Operation not found"))?;
366
367 if operation.operation_module_kind() != M::kind().as_str() {
368 bail!("Operation is not a lightning operation");
369 }
370
371 Ok(operation)
372 }
373
374 fn global_db(&self) -> fedimint_core::db::Database {
378 let db = Clone::clone(self.client.get().db());
379
380 db.ensure_global()
381 .expect("global_db must always return a global db");
382
383 db
384 }
385
386 pub fn module_db(&self) -> &Database {
387 self.module_db
388 .ensure_isolated()
389 .expect("module_db must always return isolated db");
390 &self.module_db
391 }
392
393 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
394 self.client.get().has_active_states(op_id).await
395 }
396
397 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
398 self.client.get().operation_exists(op_id).await
399 }
400
401 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
402 self.client
403 .get()
404 .executor()
405 .get_active_states()
406 .await
407 .into_iter()
408 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
409 .map(|s| {
410 (
411 Clone::clone(
412 s.0.as_any()
413 .downcast_ref::<M::States>()
414 .expect("incorrect output type passed to module plugin"),
415 ),
416 s.1,
417 )
418 })
419 .collect()
420 }
421
422 pub async fn get_config(&self) -> ClientConfig {
423 self.client.get().config().await
424 }
425
426 pub async fn get_invite_code(&self) -> InviteCode {
429 let cfg = self.get_config().await.global;
430 self.client
431 .get()
432 .invite_code(
433 *cfg.api_endpoints
434 .keys()
435 .next()
436 .expect("A federation always has at least one guardian"),
437 )
438 .await
439 .expect("The guardian we requested an invite code for exists")
440 }
441
442 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
443 self.client.get().get_internal_payment_markers()
444 }
445
446 pub async fn manual_operation_start(
449 &self,
450 operation_id: OperationId,
451 op_type: &str,
452 operation_meta: impl serde::Serialize + Debug,
453 sms: Vec<DynState>,
454 ) -> anyhow::Result<()> {
455 let db = self.module_db();
456 let mut dbtx = db.begin_transaction().await;
457 {
458 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
459
460 self.manual_operation_start_inner(
461 &mut dbtx.to_ref_nc(),
462 operation_id,
463 op_type,
464 operation_meta,
465 sms,
466 )
467 .await?;
468 }
469
470 dbtx.commit_tx_result().await.map_err(|_| {
471 anyhow!(
472 "Operation with id {} already exists",
473 operation_id.fmt_short()
474 )
475 })?;
476
477 Ok(())
478 }
479
480 pub async fn manual_operation_start_dbtx(
481 &self,
482 dbtx: &mut DatabaseTransaction<'_>,
483 operation_id: OperationId,
484 op_type: &str,
485 operation_meta: impl serde::Serialize + Debug,
486 sms: Vec<DynState>,
487 ) -> anyhow::Result<()> {
488 self.manual_operation_start_inner(
489 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
490 operation_id,
491 op_type,
492 operation_meta,
493 sms,
494 )
495 .await
496 }
497
498 async fn manual_operation_start_inner(
501 &self,
502 dbtx: &mut DatabaseTransaction<'_>,
503 operation_id: OperationId,
504 op_type: &str,
505 operation_meta: impl serde::Serialize + Debug,
506 sms: Vec<DynState>,
507 ) -> anyhow::Result<()> {
508 dbtx.ensure_global()
509 .expect("Must deal with global dbtx here");
510
511 if Client::operation_exists_dbtx(&mut dbtx.to_ref_nc(), operation_id).await {
512 bail!(
513 "Operation with id {} already exists",
514 operation_id.fmt_short()
515 );
516 }
517
518 self.client
519 .get()
520 .operation_log()
521 .add_operation_log_entry(&mut dbtx.to_ref_nc(), operation_id, op_type, operation_meta)
522 .await;
523
524 self.client
525 .get()
526 .executor()
527 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
528 .await
529 .expect("State machine is valid");
530
531 Ok(())
532 }
533
534 pub fn outcome_or_updates<U, S>(
535 &self,
536 operation: &OperationLogEntry,
537 operation_id: OperationId,
538 stream_gen: impl FnOnce() -> S,
539 ) -> UpdateStreamOrOutcome<U>
540 where
541 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
542 S: Stream<Item = U> + MaybeSend + 'static,
543 {
544 operation.outcome_or_updates(&self.global_db(), operation_id, stream_gen)
545 }
546
547 pub async fn claim_inputs<I, S>(
548 &self,
549 dbtx: &mut DatabaseTransaction<'_>,
550 inputs: ClientInputBundle<I, S>,
551 operation_id: OperationId,
552 ) -> anyhow::Result<OutPointRange>
553 where
554 I: IInput + MaybeSend + MaybeSync + 'static,
555 S: sm::IState + MaybeSend + MaybeSync + 'static,
556 {
557 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
558 .await
559 }
560
561 async fn claim_inputs_dyn(
562 &self,
563 dbtx: &mut DatabaseTransaction<'_>,
564 inputs: InstancelessDynClientInputBundle,
565 operation_id: OperationId,
566 ) -> anyhow::Result<OutPointRange> {
567 let tx_builder =
568 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
569
570 self.client
571 .get()
572 .finalize_and_submit_transaction_inner(
573 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
574 operation_id,
575 tx_builder,
576 )
577 .await
578 }
579
580 pub async fn add_state_machines_dbtx(
581 &self,
582 dbtx: &mut DatabaseTransaction<'_>,
583 states: Vec<DynState>,
584 ) -> AddStateMachinesResult {
585 self.client
586 .get()
587 .executor()
588 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
589 .await
590 }
591
592 pub async fn add_operation_log_entry_dbtx(
593 &self,
594 dbtx: &mut DatabaseTransaction<'_>,
595 operation_id: OperationId,
596 operation_type: &str,
597 operation_meta: impl serde::Serialize,
598 ) {
599 self.client
600 .get()
601 .operation_log()
602 .add_operation_log_entry(
603 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
604 operation_id,
605 operation_type,
606 operation_meta,
607 )
608 .await;
609 }
610
611 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
612 where
613 E: Event + Send,
614 Cap: Send,
615 {
616 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
617 warn!(
618 target: LOG_CLIENT,
619 module_kind = %<M as ClientModule>::kind(),
620 event_module = ?<E as Event>::MODULE,
621 "Client module logging events of different module than its own. This might become an error in the future."
622 );
623 }
624 self.client
625 .get()
626 .log_event_json(
627 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
628 <E as Event>::MODULE,
629 self.module_instance_id,
630 <E as Event>::KIND,
631 serde_json::to_value(event).expect("Can't fail"),
632 <E as Event>::PERSIST,
633 )
634 .await;
635 }
636}
637
638#[apply(async_trait_maybe_send!)]
640pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
641 type Init: ClientModuleInit;
642
643 type Common: ModuleCommon;
645
646 type Backup: ModuleBackup;
649
650 type ModuleStateMachineContext: Context;
653
654 type States: State<ModuleContext = Self::ModuleStateMachineContext>
656 + IntoDynInstance<DynType = DynState>;
657
658 fn decoder() -> Decoder {
659 let mut decoder_builder = Self::Common::decoder_builder();
660 decoder_builder.with_decodable_type::<Self::States>();
661 decoder_builder.with_decodable_type::<Self::Backup>();
662 decoder_builder.build()
663 }
664
665 fn kind() -> ModuleKind {
666 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
667 }
668
669 fn context(&self) -> Self::ModuleStateMachineContext;
670
671 async fn start(&self) {}
677
678 async fn handle_cli_command(
679 &self,
680 _args: &[ffi::OsString],
681 ) -> anyhow::Result<serde_json::Value> {
682 Err(anyhow::format_err!(
683 "This module does not implement cli commands"
684 ))
685 }
686
687 async fn handle_rpc(
688 &self,
689 _method: String,
690 _request: serde_json::Value,
691 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
692 Box::pin(futures::stream::once(std::future::ready(Err(
693 anyhow::format_err!("This module does not implement rpc"),
694 ))))
695 }
696
697 fn input_fee(
706 &self,
707 amount: Amount,
708 input: &<Self::Common as ModuleCommon>::Input,
709 ) -> Option<Amount>;
710
711 fn output_fee(
720 &self,
721 amount: Amount,
722 output: &<Self::Common as ModuleCommon>::Output,
723 ) -> Option<Amount>;
724
725 fn supports_backup(&self) -> bool {
726 false
727 }
728
729 async fn backup(&self) -> anyhow::Result<Self::Backup> {
730 anyhow::bail!("Backup not supported");
731 }
732
733 fn supports_being_primary(&self) -> bool {
742 false
743 }
744
745 async fn create_final_inputs_and_outputs(
764 &self,
765 _dbtx: &mut DatabaseTransaction<'_>,
766 _operation_id: OperationId,
767 _input_amount: Amount,
768 _output_amount: Amount,
769 ) -> anyhow::Result<(
770 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
771 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
772 )> {
773 unimplemented!()
774 }
775
776 async fn await_primary_module_output(
781 &self,
782 _operation_id: OperationId,
783 _out_point: OutPoint,
784 ) -> anyhow::Result<()> {
785 unimplemented!()
786 }
787
788 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
791 unimplemented!()
792 }
793
794 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
797 unimplemented!()
798 }
799
800 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
856 bail!("Unable to determine if safe to leave the federation: Not implemented")
857 }
858}
859
860#[apply(async_trait_maybe_send!)]
862pub trait IClientModule: Debug {
863 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
864
865 fn decoder(&self) -> Decoder;
866
867 fn context(&self, instance: ModuleInstanceId) -> DynContext;
868
869 async fn start(&self);
870
871 async fn handle_cli_command(&self, args: &[ffi::OsString])
872 -> anyhow::Result<serde_json::Value>;
873
874 async fn handle_rpc(
875 &self,
876 method: String,
877 request: serde_json::Value,
878 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
879
880 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
881
882 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
883
884 fn supports_backup(&self) -> bool;
885
886 async fn backup(&self, module_instance_id: ModuleInstanceId)
887 -> anyhow::Result<DynModuleBackup>;
888
889 fn supports_being_primary(&self) -> bool;
890
891 async fn create_final_inputs_and_outputs(
892 &self,
893 module_instance: ModuleInstanceId,
894 dbtx: &mut DatabaseTransaction<'_>,
895 operation_id: OperationId,
896 input_amount: Amount,
897 output_amount: Amount,
898 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
899
900 async fn await_primary_module_output(
901 &self,
902 operation_id: OperationId,
903 out_point: OutPoint,
904 ) -> anyhow::Result<()>;
905
906 async fn get_balance(
907 &self,
908 module_instance: ModuleInstanceId,
909 dbtx: &mut DatabaseTransaction<'_>,
910 ) -> Amount;
911
912 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
913}
914
915#[apply(async_trait_maybe_send!)]
916impl<T> IClientModule for T
917where
918 T: ClientModule,
919{
920 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
921 self
922 }
923
924 fn decoder(&self) -> Decoder {
925 T::decoder()
926 }
927
928 fn context(&self, instance: ModuleInstanceId) -> DynContext {
929 DynContext::from_typed(instance, <T as ClientModule>::context(self))
930 }
931
932 async fn start(&self) {
933 <T as ClientModule>::start(self).await;
934 }
935
936 async fn handle_cli_command(
937 &self,
938 args: &[ffi::OsString],
939 ) -> anyhow::Result<serde_json::Value> {
940 <T as ClientModule>::handle_cli_command(self, args).await
941 }
942
943 async fn handle_rpc(
944 &self,
945 method: String,
946 request: serde_json::Value,
947 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
948 <T as ClientModule>::handle_rpc(self, method, request).await
949 }
950
951 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
952 <T as ClientModule>::input_fee(
953 self,
954 amount,
955 input
956 .as_any()
957 .downcast_ref()
958 .expect("Dispatched to correct module"),
959 )
960 }
961
962 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
963 <T as ClientModule>::output_fee(
964 self,
965 amount,
966 output
967 .as_any()
968 .downcast_ref()
969 .expect("Dispatched to correct module"),
970 )
971 }
972
973 fn supports_backup(&self) -> bool {
974 <T as ClientModule>::supports_backup(self)
975 }
976
977 async fn backup(
978 &self,
979 module_instance_id: ModuleInstanceId,
980 ) -> anyhow::Result<DynModuleBackup> {
981 Ok(DynModuleBackup::from_typed(
982 module_instance_id,
983 <T as ClientModule>::backup(self).await?,
984 ))
985 }
986
987 fn supports_being_primary(&self) -> bool {
988 <T as ClientModule>::supports_being_primary(self)
989 }
990
991 async fn create_final_inputs_and_outputs(
992 &self,
993 module_instance: ModuleInstanceId,
994 dbtx: &mut DatabaseTransaction<'_>,
995 operation_id: OperationId,
996 input_amount: Amount,
997 output_amount: Amount,
998 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
999 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1000 self,
1001 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1002 operation_id,
1003 input_amount,
1004 output_amount,
1005 )
1006 .await?;
1007
1008 let inputs = inputs.into_dyn(module_instance);
1009
1010 let outputs = outputs.into_dyn(module_instance);
1011
1012 Ok((inputs, outputs))
1013 }
1014
1015 async fn await_primary_module_output(
1016 &self,
1017 operation_id: OperationId,
1018 out_point: OutPoint,
1019 ) -> anyhow::Result<()> {
1020 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1021 }
1022
1023 async fn get_balance(
1024 &self,
1025 module_instance: ModuleInstanceId,
1026 dbtx: &mut DatabaseTransaction<'_>,
1027 ) -> Amount {
1028 <T as ClientModule>::get_balance(
1029 self,
1030 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1031 )
1032 .await
1033 }
1034
1035 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1036 <T as ClientModule>::subscribe_balance_changes(self).await
1037 }
1038}
1039
1040dyn_newtype_define!(
1041 #[derive(Clone)]
1042 pub DynClientModule(Arc<IClientModule>)
1043);
1044
1045impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1046 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1047 self.inner.as_ref()
1048 }
1049}
1050
1051#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
1053pub struct IdxRange {
1054 start: u64,
1055 end: u64,
1056}
1057
1058impl IdxRange {
1059 pub fn new_single(start: u64) -> Option<Self> {
1060 start.checked_add(1).map(|end| Self { start, end })
1061 }
1062
1063 pub fn start(self) -> u64 {
1064 self.start
1065 }
1066
1067 pub fn count(self) -> usize {
1068 self.into_iter().count()
1069 }
1070
1071 pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1072 range.end().checked_add(1).map(|end| Self {
1073 start: *range.start(),
1074 end,
1075 })
1076 }
1077}
1078
1079impl From<Range<u64>> for IdxRange {
1080 fn from(Range { start, end }: Range<u64>) -> Self {
1081 Self { start, end }
1082 }
1083}
1084
1085impl IntoIterator for IdxRange {
1086 type Item = u64;
1087
1088 type IntoIter = ops::Range<u64>;
1089
1090 fn into_iter(self) -> Self::IntoIter {
1091 ops::Range {
1092 start: self.start,
1093 end: self.end,
1094 }
1095 }
1096}
1097
1098#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
1099pub struct OutPointRange {
1100 pub txid: TransactionId,
1101 idx_range: IdxRange,
1102}
1103
1104impl OutPointRange {
1105 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1106 Self { txid, idx_range }
1107 }
1108
1109 pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1110 IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1111 }
1112
1113 pub fn start_idx(self) -> u64 {
1114 self.idx_range.start()
1115 }
1116
1117 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1118 self.idx_range.into_iter()
1119 }
1120
1121 pub fn count(self) -> usize {
1122 self.idx_range.count()
1123 }
1124}
1125
1126impl IntoIterator for OutPointRange {
1127 type Item = OutPoint;
1128
1129 type IntoIter = OutPointRangeIter;
1130
1131 fn into_iter(self) -> Self::IntoIter {
1132 OutPointRangeIter {
1133 txid: self.txid,
1134 inner: self.idx_range.into_iter(),
1135 }
1136 }
1137}
1138
1139pub struct OutPointRangeIter {
1140 txid: TransactionId,
1141
1142 inner: ops::Range<u64>,
1143}
1144
1145impl OutPointRange {
1146 pub fn txid(&self) -> TransactionId {
1147 self.txid
1148 }
1149}
1150
1151impl Iterator for OutPointRangeIter {
1152 type Item = OutPoint;
1153
1154 fn next(&mut self) -> Option<Self::Item> {
1155 self.inner.next().map(|idx| OutPoint {
1156 txid: self.txid,
1157 out_idx: idx,
1158 })
1159 }
1160}
1161
1162pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;