1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::{ffi, marker, ops};
6
7use anyhow::{anyhow, bail};
8use bitcoin::secp256k1::PublicKey;
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_core::config::ClientConfig;
11use fedimint_core::core::{
12 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
13 OperationId,
14};
15use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken};
16use fedimint_core::encoding::{Decodable, Encodable};
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
19use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
20use fedimint_core::task::{MaybeSend, MaybeSync};
21use fedimint_core::util::BoxStream;
22use fedimint_core::{
23 apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send_sync, Amount, OutPoint,
24 TransactionId,
25};
26use futures::Stream;
27use serde::de::DeserializeOwned;
28use serde::Serialize;
29
30use self::init::ClientModuleInit;
31use crate::db::event_log::Event;
32use crate::module::recovery::{DynModuleBackup, ModuleBackup};
33use crate::oplog::{OperationLogEntry, UpdateStreamOrOutcome};
34use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, State};
35use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
36use crate::{
37 oplog, AddStateMachinesResult, Client, ClientStrong, ClientWeak,
38 InstancelessDynClientInputBundle, TransactionUpdates,
39};
40
41pub mod init;
42pub mod recovery;
43
44pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
45
46#[derive(Clone, Default)]
52pub struct FinalClient(Arc<std::sync::OnceLock<ClientWeak>>);
53
54impl FinalClient {
55 pub(crate) fn get(&self) -> ClientStrong {
61 self.0
62 .get()
63 .expect("client must be already set")
64 .upgrade()
65 .expect("client module context must not be use past client shutdown")
66 }
67
68 pub(crate) fn set(&self, client: ClientWeak) {
69 self.0.set(client).expect("FinalLazyClient already set");
70 }
71}
72
73pub struct ClientContext<M> {
78 client: FinalClient,
79 module_instance_id: ModuleInstanceId,
80 global_dbtx_access_token: GlobalDBTxAccessToken,
81 module_db: Database,
82 _marker: marker::PhantomData<M>,
83}
84
85impl<M> Clone for ClientContext<M> {
86 fn clone(&self) -> Self {
87 Self {
88 client: self.client.clone(),
89 module_db: self.module_db.clone(),
90 module_instance_id: self.module_instance_id,
91 _marker: marker::PhantomData,
92 global_dbtx_access_token: self.global_dbtx_access_token,
93 }
94 }
95}
96
97pub struct ClientContextSelfRef<'s, M> {
100 client: ClientStrong,
103 module_instance_id: ModuleInstanceId,
104 _marker: marker::PhantomData<&'s M>,
105}
106
107impl<M> ops::Deref for ClientContextSelfRef<'_, M>
108where
109 M: ClientModule,
110{
111 type Target = M;
112
113 fn deref(&self) -> &Self::Target {
114 self.client
115 .get_module(self.module_instance_id)
116 .as_any()
117 .downcast_ref::<M>()
118 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
119 }
120}
121
122impl<M> fmt::Debug for ClientContext<M> {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.write_str("ClientContext")
125 }
126}
127
128impl<M> ClientContext<M>
129where
130 M: ClientModule,
131{
132 #[allow(clippy::needless_lifetimes)] pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
144 ClientContextSelfRef {
145 client: self.client.get(),
146 module_instance_id: self.module_instance_id,
147 _marker: marker::PhantomData,
148 }
149 }
150
151 pub fn global_api(&self) -> DynGlobalApi {
153 self.client.get().api_clone()
154 }
155 pub fn decoders(&self) -> ModuleDecoderRegistry {
156 self.client.get().decoders().clone()
157 }
158
159 pub fn input_from_dyn<'i>(
160 &self,
161 input: &'i DynInput,
162 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
163 (input.module_instance_id() == self.module_instance_id).then(|| {
164 input
165 .as_any()
166 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
167 .expect("instance_id just checked")
168 })
169 }
170
171 pub fn output_from_dyn<'o>(
172 &self,
173 output: &'o DynOutput,
174 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
175 (output.module_instance_id() == self.module_instance_id).then(|| {
176 output
177 .as_any()
178 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
179 .expect("instance_id just checked")
180 })
181 }
182
183 pub fn map_dyn<'s, 'i, 'o, I>(
184 &'s self,
185 typed: impl IntoIterator<Item = I> + 'i,
186 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
187 where
188 I: IntoDynInstance,
189 'i: 'o,
190 's: 'o,
191 {
192 typed.into_iter().map(|i| self.make_dyn(i))
193 }
194
195 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
197 self.make_dyn(output)
198 }
199
200 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
202 self.make_dyn(input)
203 }
204
205 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
207 where
208 I: IntoDynInstance,
209 {
210 typed.into_dyn(self.module_instance_id)
211 }
212
213 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
215 where
216 O: IntoDynInstance<DynType = DynOutput> + 'static,
217 S: IntoDynInstance<DynType = DynState> + 'static,
218 {
219 self.make_dyn(output)
220 }
221
222 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
224 where
225 I: IntoDynInstance<DynType = DynInput> + 'static,
226 S: IntoDynInstance<DynType = DynState> + 'static,
227 {
228 self.make_dyn(inputs)
229 }
230
231 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
232 where
233 S: sm::IState + 'static,
234 {
235 DynState::from_typed(self.module_instance_id, sm)
236 }
237
238 pub async fn finalize_and_submit_transaction<F, Meta>(
240 &self,
241 operation_id: OperationId,
242 operation_type: &str,
243 operation_meta: F,
244 tx_builder: TransactionBuilder,
245 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
246 where
247 F: Fn(TransactionId, Vec<OutPoint>) -> Meta + Clone + MaybeSend + MaybeSync,
248 Meta: serde::Serialize + MaybeSend,
249 {
250 self.client
251 .get()
252 .finalize_and_submit_transaction(
253 operation_id,
254 operation_type,
255 operation_meta,
256 tx_builder,
257 )
258 .await
259 }
260
261 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
263 self.client.get().transaction_updates(operation_id).await
264 }
265
266 pub async fn await_primary_module_outputs(
268 &self,
269 operation_id: OperationId,
270 outputs: Vec<OutPoint>,
271 ) -> anyhow::Result<()> {
272 self.client
273 .get()
274 .await_primary_module_outputs(operation_id, outputs)
275 .await
276 }
277
278 pub async fn get_operation(
280 &self,
281 operation_id: OperationId,
282 ) -> anyhow::Result<oplog::OperationLogEntry> {
283 let operation = self
284 .client
285 .get()
286 .operation_log()
287 .get_operation(operation_id)
288 .await
289 .ok_or(anyhow::anyhow!("Operation not found"))?;
290
291 if operation.operation_module_kind() != M::kind().as_str() {
292 bail!("Operation is not a lightning operation");
293 }
294
295 Ok(operation)
296 }
297
298 fn global_db(&self) -> fedimint_core::db::Database {
302 let db = self.client.get().db().clone();
303
304 db.ensure_global()
305 .expect("global_db must always return a global db");
306
307 db
308 }
309
310 pub fn module_db(&self) -> &Database {
311 self.module_db
312 .ensure_isolated()
313 .expect("module_db must always return isolated db");
314 &self.module_db
315 }
316
317 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
318 self.client.get().has_active_states(op_id).await
319 }
320
321 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
322 self.client.get().operation_exists(op_id).await
323 }
324
325 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
326 self.client
327 .get()
328 .executor
329 .get_active_states()
330 .await
331 .into_iter()
332 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
333 .map(|s| {
334 (
335 s.0.as_any()
336 .downcast_ref::<M::States>()
337 .expect("incorrect output type passed to module plugin")
338 .clone(),
339 s.1,
340 )
341 })
342 .collect()
343 }
344
345 pub async fn get_config(&self) -> ClientConfig {
346 self.client.get().config().await
347 }
348
349 pub async fn get_invite_code(&self) -> InviteCode {
352 let cfg = self.get_config().await.global;
353 self.client
354 .get()
355 .invite_code(
356 *cfg.api_endpoints
357 .keys()
358 .next()
359 .expect("A federation always has at least one guardian"),
360 )
361 .await
362 .expect("The guardian we requested an invite code for exists")
363 }
364
365 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
366 self.client.get().get_internal_payment_markers()
367 }
368
369 pub async fn manual_operation_start(
372 &self,
373 operation_id: OperationId,
374 op_type: &str,
375 operation_meta: impl serde::Serialize + Debug,
376 sms: Vec<DynState>,
377 ) -> anyhow::Result<()> {
378 let db = self.module_db();
379 let mut dbtx = db.begin_transaction().await;
380 {
381 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
382
383 self.manual_operation_start_inner(
384 &mut dbtx.to_ref_nc(),
385 operation_id,
386 op_type,
387 operation_meta,
388 sms,
389 )
390 .await?;
391 }
392
393 dbtx.commit_tx_result().await.map_err(|_| {
394 anyhow!(
395 "Operation with id {} already exists",
396 operation_id.fmt_short()
397 )
398 })?;
399
400 Ok(())
401 }
402
403 pub async fn manual_operation_start_dbtx(
404 &self,
405 dbtx: &mut DatabaseTransaction<'_>,
406 operation_id: OperationId,
407 op_type: &str,
408 operation_meta: impl serde::Serialize + Debug,
409 sms: Vec<DynState>,
410 ) -> anyhow::Result<()> {
411 self.manual_operation_start_inner(
412 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
413 operation_id,
414 op_type,
415 operation_meta,
416 sms,
417 )
418 .await
419 }
420
421 async fn manual_operation_start_inner(
424 &self,
425 dbtx: &mut DatabaseTransaction<'_>,
426 operation_id: OperationId,
427 op_type: &str,
428 operation_meta: impl serde::Serialize + Debug,
429 sms: Vec<DynState>,
430 ) -> anyhow::Result<()> {
431 dbtx.ensure_global()
432 .expect("Must deal with global dbtx here");
433
434 if Client::operation_exists_dbtx(&mut dbtx.to_ref_nc(), operation_id).await {
435 bail!(
436 "Operation with id {} already exists",
437 operation_id.fmt_short()
438 );
439 }
440
441 self.client
442 .get()
443 .operation_log
444 .add_operation_log_entry(&mut dbtx.to_ref_nc(), operation_id, op_type, operation_meta)
445 .await;
446
447 self.client
448 .get()
449 .executor
450 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
451 .await
452 .expect("State machine is valid");
453
454 Ok(())
455 }
456
457 pub fn outcome_or_updates<U, S>(
458 &self,
459 operation: &OperationLogEntry,
460 operation_id: OperationId,
461 stream_gen: impl FnOnce() -> S,
462 ) -> UpdateStreamOrOutcome<U>
463 where
464 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
465 S: Stream<Item = U> + MaybeSend + 'static,
466 {
467 operation.outcome_or_updates(&self.global_db(), operation_id, stream_gen)
468 }
469
470 pub async fn claim_inputs<I, S>(
471 &self,
472 dbtx: &mut DatabaseTransaction<'_>,
473 inputs: ClientInputBundle<I, S>,
474 operation_id: OperationId,
475 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
476 where
477 I: IInput + MaybeSend + MaybeSync + 'static,
478 S: sm::IState + MaybeSend + MaybeSync + 'static,
479 {
480 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
481 .await
482 }
483
484 async fn claim_inputs_dyn(
485 &self,
486 dbtx: &mut DatabaseTransaction<'_>,
487 inputs: InstancelessDynClientInputBundle,
488 operation_id: OperationId,
489 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
490 let tx_builder =
491 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
492
493 self.client
494 .get()
495 .finalize_and_submit_transaction_inner(
496 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
497 operation_id,
498 tx_builder,
499 )
500 .await
501 }
502
503 pub async fn add_state_machines_dbtx(
504 &self,
505 dbtx: &mut DatabaseTransaction<'_>,
506 states: Vec<DynState>,
507 ) -> AddStateMachinesResult {
508 self.client
509 .get()
510 .executor
511 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
512 .await
513 }
514
515 pub async fn add_operation_log_entry_dbtx(
516 &self,
517 dbtx: &mut DatabaseTransaction<'_>,
518 operation_id: OperationId,
519 operation_type: &str,
520 operation_meta: impl serde::Serialize,
521 ) {
522 self.client
523 .get()
524 .operation_log()
525 .add_operation_log_entry(
526 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
527 operation_id,
528 operation_type,
529 operation_meta,
530 )
531 .await;
532 }
533
534 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
535 where
536 E: Event + Send,
537 Cap: Send,
538 {
539 self.client
540 .get()
541 .log_event_dbtx(
542 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
543 Some(self.module_instance_id),
544 event,
545 )
546 .await;
547 }
548}
549
550#[apply(async_trait_maybe_send!)]
552pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
553 type Init: ClientModuleInit;
554
555 type Common: ModuleCommon;
557
558 type Backup: ModuleBackup;
561
562 type ModuleStateMachineContext: Context;
565
566 type States: State<ModuleContext = Self::ModuleStateMachineContext>
568 + IntoDynInstance<DynType = DynState>;
569
570 fn decoder() -> Decoder {
571 let mut decoder_builder = Self::Common::decoder_builder();
572 decoder_builder.with_decodable_type::<Self::States>();
573 decoder_builder.with_decodable_type::<Self::Backup>();
574 decoder_builder.build()
575 }
576
577 fn kind() -> ModuleKind {
578 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
579 }
580
581 fn context(&self) -> Self::ModuleStateMachineContext;
582
583 async fn start(&self) {}
589
590 async fn handle_cli_command(
591 &self,
592 _args: &[ffi::OsString],
593 ) -> anyhow::Result<serde_json::Value> {
594 Err(anyhow::format_err!(
595 "This module does not implement cli commands"
596 ))
597 }
598
599 async fn handle_rpc(
600 &self,
601 _method: String,
602 _request: serde_json::Value,
603 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
604 Box::pin(futures::stream::once(std::future::ready(Err(
605 anyhow::format_err!("This module does not implement rpc"),
606 ))))
607 }
608
609 fn input_fee(
618 &self,
619 amount: Amount,
620 input: &<Self::Common as ModuleCommon>::Input,
621 ) -> Option<Amount>;
622
623 fn output_fee(
632 &self,
633 amount: Amount,
634 output: &<Self::Common as ModuleCommon>::Output,
635 ) -> Option<Amount>;
636
637 fn supports_backup(&self) -> bool {
638 false
639 }
640
641 async fn backup(&self) -> anyhow::Result<Self::Backup> {
642 anyhow::bail!("Backup not supported");
643 }
644
645 fn supports_being_primary(&self) -> bool {
654 false
655 }
656
657 async fn create_final_inputs_and_outputs(
676 &self,
677 _dbtx: &mut DatabaseTransaction<'_>,
678 _operation_id: OperationId,
679 _input_amount: Amount,
680 _output_amount: Amount,
681 ) -> anyhow::Result<(
682 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
683 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
684 )> {
685 unimplemented!()
686 }
687
688 async fn await_primary_module_output(
693 &self,
694 _operation_id: OperationId,
695 _out_point: OutPoint,
696 ) -> anyhow::Result<()> {
697 unimplemented!()
698 }
699
700 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
703 unimplemented!()
704 }
705
706 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
709 unimplemented!()
710 }
711
712 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
768 bail!("Unable to determine if safe to leave the federation: Not implemented")
769 }
770}
771
772#[apply(async_trait_maybe_send!)]
774pub trait IClientModule: Debug {
775 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
776
777 fn decoder(&self) -> Decoder;
778
779 fn context(&self, instance: ModuleInstanceId) -> DynContext;
780
781 async fn start(&self);
782
783 async fn handle_cli_command(&self, args: &[ffi::OsString])
784 -> anyhow::Result<serde_json::Value>;
785
786 async fn handle_rpc(
787 &self,
788 method: String,
789 request: serde_json::Value,
790 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
791
792 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
793
794 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
795
796 fn supports_backup(&self) -> bool;
797
798 async fn backup(&self, module_instance_id: ModuleInstanceId)
799 -> anyhow::Result<DynModuleBackup>;
800
801 fn supports_being_primary(&self) -> bool;
802
803 async fn create_final_inputs_and_outputs(
804 &self,
805 module_instance: ModuleInstanceId,
806 dbtx: &mut DatabaseTransaction<'_>,
807 operation_id: OperationId,
808 input_amount: Amount,
809 output_amount: Amount,
810 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
811
812 async fn await_primary_module_output(
813 &self,
814 operation_id: OperationId,
815 out_point: OutPoint,
816 ) -> anyhow::Result<()>;
817
818 async fn get_balance(
819 &self,
820 module_instance: ModuleInstanceId,
821 dbtx: &mut DatabaseTransaction<'_>,
822 ) -> Amount;
823
824 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
825}
826
827#[apply(async_trait_maybe_send!)]
828impl<T> IClientModule for T
829where
830 T: ClientModule,
831{
832 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
833 self
834 }
835
836 fn decoder(&self) -> Decoder {
837 T::decoder()
838 }
839
840 fn context(&self, instance: ModuleInstanceId) -> DynContext {
841 DynContext::from_typed(instance, <T as ClientModule>::context(self))
842 }
843
844 async fn start(&self) {
845 <T as ClientModule>::start(self).await;
846 }
847
848 async fn handle_cli_command(
849 &self,
850 args: &[ffi::OsString],
851 ) -> anyhow::Result<serde_json::Value> {
852 <T as ClientModule>::handle_cli_command(self, args).await
853 }
854
855 async fn handle_rpc(
856 &self,
857 method: String,
858 request: serde_json::Value,
859 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
860 <T as ClientModule>::handle_rpc(self, method, request).await
861 }
862
863 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
864 <T as ClientModule>::input_fee(
865 self,
866 amount,
867 input
868 .as_any()
869 .downcast_ref()
870 .expect("Dispatched to correct module"),
871 )
872 }
873
874 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
875 <T as ClientModule>::output_fee(
876 self,
877 amount,
878 output
879 .as_any()
880 .downcast_ref()
881 .expect("Dispatched to correct module"),
882 )
883 }
884
885 fn supports_backup(&self) -> bool {
886 <T as ClientModule>::supports_backup(self)
887 }
888
889 async fn backup(
890 &self,
891 module_instance_id: ModuleInstanceId,
892 ) -> anyhow::Result<DynModuleBackup> {
893 Ok(DynModuleBackup::from_typed(
894 module_instance_id,
895 <T as ClientModule>::backup(self).await?,
896 ))
897 }
898
899 fn supports_being_primary(&self) -> bool {
900 <T as ClientModule>::supports_being_primary(self)
901 }
902
903 async fn create_final_inputs_and_outputs(
904 &self,
905 module_instance: ModuleInstanceId,
906 dbtx: &mut DatabaseTransaction<'_>,
907 operation_id: OperationId,
908 input_amount: Amount,
909 output_amount: Amount,
910 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
911 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
912 self,
913 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
914 operation_id,
915 input_amount,
916 output_amount,
917 )
918 .await?;
919
920 let inputs = inputs.into_dyn(module_instance);
921
922 let outputs = outputs.into_dyn(module_instance);
923
924 Ok((inputs, outputs))
925 }
926
927 async fn await_primary_module_output(
928 &self,
929 operation_id: OperationId,
930 out_point: OutPoint,
931 ) -> anyhow::Result<()> {
932 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
933 }
934
935 async fn get_balance(
936 &self,
937 module_instance: ModuleInstanceId,
938 dbtx: &mut DatabaseTransaction<'_>,
939 ) -> Amount {
940 <T as ClientModule>::get_balance(
941 self,
942 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
943 )
944 .await
945 }
946
947 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
948 <T as ClientModule>::subscribe_balance_changes(self).await
949 }
950}
951
952dyn_newtype_define!(
953 #[derive(Clone)]
954 pub DynClientModule(Arc<IClientModule>)
955);
956
957impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
958 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
959 self.inner.as_ref()
960 }
961}
962
963#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
965pub struct IdxRange {
966 start: u64,
967 end_inclusive: u64,
968}
969
970impl IdxRange {
971 pub fn new_single(start: u64) -> Self {
972 Self {
973 start,
974 end_inclusive: start,
975 }
976 }
977
978 pub fn start(self) -> u64 {
979 self.start
980 }
981
982 pub fn count(self) -> usize {
983 self.into_iter().count()
984 }
985}
986
987impl IntoIterator for IdxRange {
988 type Item = u64;
989
990 type IntoIter = ops::RangeInclusive<u64>;
991
992 fn into_iter(self) -> Self::IntoIter {
993 ops::RangeInclusive::new(self.start, self.end_inclusive)
994 }
995}
996
997impl From<ops::RangeInclusive<u64>> for IdxRange {
998 fn from(value: ops::RangeInclusive<u64>) -> Self {
999 Self {
1000 start: *value.start(),
1001 end_inclusive: *value.end(),
1002 }
1003 }
1004}
1005
1006#[derive(Copy, Clone, Encodable, Decodable, PartialEq, Eq, Hash, Debug)]
1007pub struct OutPointRange {
1008 txid: TransactionId,
1009 idx_range: IdxRange,
1010}
1011
1012impl OutPointRange {
1013 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1014 Self { txid, idx_range }
1015 }
1016
1017 pub fn new_single(txid: TransactionId, idx: u64) -> Self {
1018 Self {
1019 txid,
1020 idx_range: IdxRange::new_single(idx),
1021 }
1022 }
1023
1024 pub fn start_idx(self) -> u64 {
1025 self.idx_range.start()
1026 }
1027
1028 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1029 self.idx_range.into_iter()
1030 }
1031
1032 pub fn count(self) -> usize {
1033 self.idx_range.count()
1034 }
1035}
1036
1037impl IntoIterator for OutPointRange {
1038 type Item = OutPoint;
1039
1040 type IntoIter = OutPointRangeIter;
1041
1042 fn into_iter(self) -> Self::IntoIter {
1043 OutPointRangeIter {
1044 txid: self.txid,
1045 inner: self.idx_range.into_iter(),
1046 }
1047 }
1048}
1049
1050pub struct OutPointRangeIter {
1051 txid: TransactionId,
1052
1053 inner: ops::RangeInclusive<u64>,
1054}
1055
1056impl OutPointRange {
1057 pub fn txid(&self) -> TransactionId {
1058 self.txid
1059 }
1060}
1061
1062impl Iterator for OutPointRangeIter {
1063 type Item = OutPoint;
1064
1065 fn next(&mut self) -> Option<Self::Item> {
1066 self.inner.next().map(|idx| OutPoint {
1067 txid: self.txid,
1068 out_idx: idx,
1069 })
1070 }
1071}
1072
1073pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;