1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18mod pegin_monitor;
20mod withdraw;
21
22use std::collections::BTreeMap;
23use std::future;
24use std::sync::Arc;
25use std::time::SystemTime;
26
27use anyhow::{anyhow, bail, ensure, Context as AnyhowContext};
28use async_stream::stream;
29use backup::WalletModuleBackup;
30use bitcoin::address::NetworkUnchecked;
31use bitcoin::secp256k1::{All, Secp256k1, SECP256K1};
32use bitcoin::{Address, Network, ScriptBuf};
33use client_db::{DbKeyPrefix, PegInTweakIndexKey, TweakIdx};
34use fedimint_api_client::api::DynModuleApi;
35use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
36use fedimint_client::derivable_secret::{ChildId, DerivableSecret};
37use fedimint_client::module::init::{
38 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
39};
40use fedimint_client::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
41use fedimint_client::oplog::UpdateStreamOrOutcome;
42use fedimint_client::sm::util::MapStateTransitions;
43use fedimint_client::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
44use fedimint_client::transaction::{
45 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
46};
47use fedimint_client::{sm_enum_variant_translation, DynGlobalClientContext};
48use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
49use fedimint_core::db::{
50 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
51};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::envs::BitcoinRpcConfig;
54use fedimint_core::module::{
55 ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
56};
57use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup};
58use fedimint_core::util::backoff_util::background_backoff;
59use fedimint_core::util::{backoff_util, retry};
60use fedimint_core::{
61 apply, async_trait_maybe_send, push_db_pair_items, runtime, secp256k1, Amount, OutPoint,
62 TransactionId,
63};
64use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
65use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
66use fedimint_wallet_common::tweakable::Tweakable;
67pub use fedimint_wallet_common::*;
68use futures::{Stream, StreamExt};
69use rand::{thread_rng, Rng};
70use secp256k1::Keypair;
71use serde::{Deserialize, Serialize};
72use strum::IntoEnumIterator;
73use tokio::sync::watch;
74use tracing::{debug, instrument};
75
76use crate::api::WalletFederationApi;
77use crate::backup::WalletRecovery;
78use crate::client_db::{
79 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
80 PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey,
81};
82use crate::deposit::DepositStateMachine;
83use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
84
85const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
86
87#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
88pub struct BitcoinTransactionData {
89 pub btc_transaction: bitcoin::Transaction,
92 pub out_idx: u32,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
97pub enum DepositStateV1 {
98 WaitingForTransaction,
99 WaitingForConfirmation(BitcoinTransactionData),
100 Confirmed(BitcoinTransactionData),
101 Claimed(BitcoinTransactionData),
102 Failed(String),
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
106pub enum DepositStateV2 {
107 WaitingForTransaction,
108 WaitingForConfirmation {
109 #[serde(with = "bitcoin::amount::serde::as_sat")]
110 btc_deposited: bitcoin::Amount,
111 btc_out_point: bitcoin::OutPoint,
112 },
113 Confirmed {
114 #[serde(with = "bitcoin::amount::serde::as_sat")]
115 btc_deposited: bitcoin::Amount,
116 btc_out_point: bitcoin::OutPoint,
117 },
118 Claimed {
119 #[serde(with = "bitcoin::amount::serde::as_sat")]
120 btc_deposited: bitcoin::Amount,
121 btc_out_point: bitcoin::OutPoint,
122 },
123 Failed(String),
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
127pub enum WithdrawState {
128 Created,
129 Succeeded(bitcoin::Txid),
130 Failed(String),
131 }
135
136async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
137where
138 S: Stream<Item = WalletClientStates> + Unpin,
139{
140 loop {
141 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
142 return Some(ds.state);
143 }
144 tokio::task::yield_now().await;
145 }
146}
147
148#[derive(Debug, Clone, Default)]
149pub struct WalletClientInit(pub Option<BitcoinRpcConfig>);
151
152impl WalletClientInit {
153 pub fn new(rpc: BitcoinRpcConfig) -> Self {
154 Self(Some(rpc))
155 }
156}
157
158impl ModuleInit for WalletClientInit {
159 type Common = WalletCommonInit;
160
161 async fn dump_database(
162 &self,
163 dbtx: &mut DatabaseTransaction<'_>,
164 prefix_names: Vec<String>,
165 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
166 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
167 BTreeMap::new();
168 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
169 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
170 });
171
172 for table in filtered_prefixes {
173 match table {
174 DbKeyPrefix::NextPegInTweakIndex => {
175 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
176 wallet_client_items
177 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
178 }
179 }
180 DbKeyPrefix::PegInTweakIndex => {
181 push_db_pair_items!(
182 dbtx,
183 PegInTweakIndexPrefix,
184 PegInTweakIndexKey,
185 PegInTweakIndexData,
186 wallet_client_items,
187 "Peg-In Tweak Index"
188 );
189 }
190 DbKeyPrefix::ClaimedPegIn => {
191 push_db_pair_items!(
192 dbtx,
193 ClaimedPegInPrefix,
194 ClaimedPegInKey,
195 ClaimedPegInData,
196 wallet_client_items,
197 "Claimed Peg-In"
198 );
199 }
200 DbKeyPrefix::RecoveryFinalized => {
201 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
202 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
203 }
204 }
205 DbKeyPrefix::RecoveryState => {}
206 }
207 }
208
209 Box::new(wallet_client_items.into_iter())
210 }
211}
212
213#[apply(async_trait_maybe_send!)]
214impl ClientModuleInit for WalletClientInit {
215 type Module = WalletClientModule;
216
217 fn supported_api_versions(&self) -> MultiApiVersion {
218 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
219 .expect("no version conflicts")
220 }
221
222 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
223 let data = WalletClientModuleData {
224 cfg: args.cfg().clone(),
225 module_root_secret: args.module_root_secret().clone(),
226 };
227
228 let rpc_config = self
229 .0
230 .clone()
231 .unwrap_or(WalletClientModule::get_rpc_config(args.cfg()));
232
233 let db = args.db().clone();
234
235 let btc_rpc = create_bitcoind(&rpc_config, TaskGroup::new().make_handle())?;
236 let module_api = args.module_api().clone();
237
238 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
239 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
240
241 Ok(WalletClientModule {
242 db,
243 data,
244 module_api,
245 notifier: args.notifier().clone(),
246 rpc: btc_rpc,
247 client_ctx: args.context(),
248 pegin_monitor_wakeup_sender,
249 pegin_monitor_wakeup_receiver,
250 pegin_claimed_receiver,
251 pegin_claimed_sender,
252 task_group: args.task_group().clone(),
253 admin_auth: args.admin_auth().cloned(),
254 })
255 }
256
257 async fn recover(
265 &self,
266 args: &ClientModuleRecoverArgs<Self>,
267 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
268 ) -> anyhow::Result<()> {
269 args.recover_from_history::<WalletRecovery>(self, snapshot)
270 .await
271 }
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct WalletOperationMeta {
276 pub variant: WalletOperationMetaVariant,
277 pub extra_meta: serde_json::Value,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
281#[serde(rename_all = "snake_case")]
282pub enum WalletOperationMetaVariant {
283 Deposit {
284 address: Address<NetworkUnchecked>,
285 #[serde(default)]
290 tweak_idx: Option<TweakIdx>,
291 #[serde(default, skip_serializing_if = "Option::is_none")]
292 expires_at: Option<SystemTime>,
293 },
294 Withdraw {
295 address: Address<NetworkUnchecked>,
296 #[serde(with = "bitcoin::amount::serde::as_sat")]
297 amount: bitcoin::Amount,
298 fee: PegOutFees,
299 change: Vec<OutPoint>,
300 },
301
302 RbfWithdraw {
303 rbf: Rbf,
304 change: Vec<OutPoint>,
305 },
306}
307
308#[derive(Debug, Clone)]
310pub struct WalletClientModuleData {
311 cfg: WalletClientConfig,
312 module_root_secret: DerivableSecret,
313}
314
315impl WalletClientModuleData {
316 fn derive_deposit_address(
317 &self,
318 idx: TweakIdx,
319 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
320 let idx = ChildId(idx.0);
321
322 let secret_tweak_key = self
323 .module_root_secret
324 .child_key(WALLET_TWEAK_CHILD_ID)
325 .child_key(idx)
326 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
327
328 let public_tweak_key = secret_tweak_key.public_key();
329
330 let address = self
331 .cfg
332 .peg_in_descriptor
333 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
334 .address(self.cfg.network)
335 .unwrap();
336
337 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
339
340 (secret_tweak_key, public_tweak_key, address, operation_id)
341 }
342
343 fn derive_peg_in_script(
344 &self,
345 idx: TweakIdx,
346 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
347 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
348
349 (
350 self.cfg
351 .peg_in_descriptor
352 .tweak(&secret_tweak_key.public_key(), SECP256K1)
353 .script_pubkey(),
354 address,
355 secret_tweak_key,
356 operation_id,
357 )
358 }
359}
360
361#[derive(Debug)]
362pub struct WalletClientModule {
363 data: WalletClientModuleData,
364 db: Database,
365 module_api: DynModuleApi,
366 notifier: ModuleNotifier<WalletClientStates>,
367 rpc: DynBitcoindRpc,
368 client_ctx: ClientContext<Self>,
369 pegin_monitor_wakeup_sender: watch::Sender<()>,
371 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
372 pegin_claimed_sender: watch::Sender<()>,
374 pegin_claimed_receiver: watch::Receiver<()>,
375 task_group: TaskGroup,
376 admin_auth: Option<ApiAuth>,
377}
378
379#[apply(async_trait_maybe_send!)]
380impl ClientModule for WalletClientModule {
381 type Init = WalletClientInit;
382 type Common = WalletModuleTypes;
383 type Backup = WalletModuleBackup;
384 type ModuleStateMachineContext = WalletClientContext;
385 type States = WalletClientStates;
386
387 fn context(&self) -> Self::ModuleStateMachineContext {
388 WalletClientContext {
389 rpc: self.rpc.clone(),
390 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
391 wallet_decoder: self.decoder(),
392 secp: Secp256k1::default(),
393 }
394 }
395
396 async fn start(&self) {
397 self.task_group.spawn_cancellable("peg-in monitor", {
398 let client_ctx = self.client_ctx.clone();
399 let db = self.db.clone();
400 let btc_rpc = self.rpc.clone();
401 let module_api = self.module_api.clone();
402 let data = self.data.clone();
403 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
404 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
405 pegin_monitor::run_peg_in_monitor(
406 client_ctx,
407 db,
408 btc_rpc,
409 module_api,
410 data,
411 pegin_claimed_sender,
412 pegin_monitor_wakeup_receiver,
413 )
414 });
415 }
416
417 fn supports_backup(&self) -> bool {
418 true
419 }
420
421 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
422 let session_count = self.client_ctx.global_api().session_count().await?;
424
425 let mut dbtx = self.db.begin_transaction_nc().await;
426 let next_pegin_tweak_idx = dbtx
427 .get_value(&NextPegInTweakIndexKey)
428 .await
429 .unwrap_or_default();
430 let claimed = dbtx
431 .find_by_prefix(&PegInTweakIndexPrefix)
432 .await
433 .filter_map(|(k, v)| async move {
434 if v.claimed.is_empty() {
435 None
436 } else {
437 Some(k.0)
438 }
439 })
440 .collect()
441 .await;
442 Ok(backup::WalletModuleBackup::new_v1(
443 session_count,
444 next_pegin_tweak_idx,
445 claimed,
446 ))
447 }
448
449 fn input_fee(
450 &self,
451 _amount: Amount,
452 _input: &<Self::Common as ModuleCommon>::Input,
453 ) -> Option<Amount> {
454 Some(self.cfg().fee_consensus.peg_in_abs)
455 }
456
457 fn output_fee(
458 &self,
459 _amount: Amount,
460 _output: &<Self::Common as ModuleCommon>::Output,
461 ) -> Option<Amount> {
462 Some(self.cfg().fee_consensus.peg_out_abs)
463 }
464
465 #[cfg(feature = "cli")]
466 async fn handle_cli_command(
467 &self,
468 args: &[std::ffi::OsString],
469 ) -> anyhow::Result<serde_json::Value> {
470 cli::handle_cli_command(self, args).await
471 }
472}
473
474#[derive(Debug, Clone)]
475pub struct WalletClientContext {
476 rpc: DynBitcoindRpc,
477 wallet_descriptor: PegInDescriptor,
478 wallet_decoder: Decoder,
479 secp: Secp256k1<All>,
480}
481
482impl Context for WalletClientContext {
483 const KIND: Option<ModuleKind> = Some(KIND);
484}
485
486impl WalletClientModule {
487 fn cfg(&self) -> &WalletClientConfig {
488 &self.data.cfg
489 }
490
491 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
492 if let Ok(rpc_config) = BitcoinRpcConfig::get_defaults_from_env_vars() {
493 if rpc_config.kind == "bitcoind" {
496 cfg.default_bitcoin_rpc.clone()
497 } else {
498 rpc_config
499 }
500 } else {
501 cfg.default_bitcoin_rpc.clone()
502 }
503 }
504
505 pub fn get_network(&self) -> Network {
506 self.cfg().network
507 }
508
509 pub fn get_fee_consensus(&self) -> FeeConsensus {
510 self.cfg().fee_consensus
511 }
512
513 async fn allocate_deposit_address_inner(
514 &self,
515 dbtx: &mut DatabaseTransaction<'_>,
516 ) -> (OperationId, Address, TweakIdx) {
517 dbtx.ensure_isolated().expect("Must be isolated db");
518
519 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
520 let (_secret_tweak_key, _, address, operation_id) =
521 self.data.derive_deposit_address(tweak_idx);
522
523 let now = fedimint_core::time::now();
524
525 dbtx.insert_new_entry(
526 &PegInTweakIndexKey(tweak_idx),
527 &PegInTweakIndexData {
528 creation_time: now,
529 next_check_time: Some(now),
530 last_check_time: None,
531 operation_id,
532 claimed: vec![],
533 },
534 )
535 .await;
536
537 (operation_id, address, tweak_idx)
538 }
539
540 pub async fn get_withdraw_fees(
547 &self,
548 address: bitcoin::Address<NetworkUnchecked>,
549 amount: bitcoin::Amount,
550 ) -> anyhow::Result<PegOutFees> {
551 check_address(&address, self.cfg().network)?;
552
553 self.module_api
554 .fetch_peg_out_fees(&address.assume_checked(), amount)
555 .await?
556 .context("Federation didn't return peg-out fees")
557 }
558
559 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
561 Ok(self.module_api.fetch_wallet_summary().await?)
562 }
563
564 pub fn create_withdraw_output(
565 &self,
566 operation_id: OperationId,
567 address: &bitcoin::Address<NetworkUnchecked>,
568 amount: bitcoin::Amount,
569 fees: PegOutFees,
570 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
571 check_address(address, self.cfg().network)?;
572
573 let output = WalletOutput::new_v0_peg_out(address.clone(), amount, fees);
574
575 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
576
577 let sm_gen = move |out_point_range: OutPointRange| {
578 assert_eq!(out_point_range.count(), 1);
579 let out_idx = out_point_range.start_idx();
580 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
581 operation_id,
582 state: WithdrawStates::Created(CreatedWithdrawState {
583 fm_outpoint: OutPoint {
584 txid: out_point_range.txid(),
585 out_idx,
586 },
587 }),
588 })]
589 };
590
591 Ok(ClientOutputBundle::new(
592 vec![ClientOutput::<WalletOutput> { output, amount }],
593 vec![ClientOutputSM::<WalletClientStates> {
594 state_machines: Arc::new(sm_gen),
595 }],
596 ))
597 }
598
599 pub fn create_rbf_withdraw_output(
600 &self,
601 operation_id: OperationId,
602 rbf: &Rbf,
603 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
604 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
605
606 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
607
608 let sm_gen = move |out_point_range: OutPointRange| {
609 assert_eq!(out_point_range.count(), 1);
610 let out_idx = out_point_range.start_idx();
611 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
612 operation_id,
613 state: WithdrawStates::Created(CreatedWithdrawState {
614 fm_outpoint: OutPoint {
615 txid: out_point_range.txid(),
616 out_idx,
617 },
618 }),
619 })]
620 };
621
622 Ok(ClientOutputBundle::new(
623 vec![ClientOutput::<WalletOutput> { output, amount }],
624 vec![ClientOutputSM::<WalletClientStates> {
625 state_machines: Arc::new(sm_gen),
626 }],
627 ))
628 }
629
630 pub async fn allocate_deposit_address_expert_only<M>(
648 &self,
649 extra_meta: M,
650 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
651 where
652 M: Serialize + MaybeSend + MaybeSync,
653 {
654 let extra_meta_value =
655 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
656 let (operation_id, address, tweak_idx) = self
657 .db
658 .autocommit(
659 move |dbtx, _| {
660 let extra_meta_value_inner = extra_meta_value.clone();
661 Box::pin(async move {
662 let (operation_id, address, tweak_idx) = self
663 .allocate_deposit_address_inner(dbtx)
664 .await;
665
666 self.client_ctx.manual_operation_start_dbtx(
667 dbtx,
668 operation_id,
669 WalletCommonInit::KIND.as_str(),
670 WalletOperationMeta {
671 variant: WalletOperationMetaVariant::Deposit {
672 address: address.to_string().parse().expect("can be parsed"),
674 tweak_idx: Some(tweak_idx),
675 expires_at: None,
676 },
677 extra_meta: extra_meta_value_inner,
678 },
679 vec![]
680 ).await?;
681
682 debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
683
684 self.rpc
686 .watch_script_history(&address.script_pubkey())
687 .await?;
688
689 let sender = self.pegin_monitor_wakeup_sender.clone();
690 dbtx.on_commit(move || {
691 let _ = sender.send(());
692 });
693
694 Ok((operation_id, address, tweak_idx))
695 })
696 },
697 Some(100),
698 )
699 .await
700 .map_err(|e| match e {
701 AutocommitError::CommitFailed {
702 last_error,
703 attempts,
704 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
705 AutocommitError::ClosureError { error, .. } => error,
706 })?;
707
708 Ok((operation_id, address, tweak_idx))
709 }
710
711 pub async fn subscribe_deposit(
717 &self,
718 operation_id: OperationId,
719 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
720 let operation = self
721 .client_ctx
722 .get_operation(operation_id)
723 .await
724 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
725
726 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
727 bail!("Operation is not a wallet operation");
728 }
729
730 let operation_meta = operation.meta::<WalletOperationMeta>();
731
732 let WalletOperationMetaVariant::Deposit {
733 address, tweak_idx, ..
734 } = operation_meta.variant
735 else {
736 bail!("Operation is not a deposit operation");
737 };
738
739 let Some(tweak_idx) = tweak_idx else {
741 let outcome_v1 = operation
745 .outcome::<DepositStateV1>()
746 .context("Old pending deposit, can't subscribe to updates")?;
747
748 let outcome_v2 = match outcome_v1 {
749 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
750 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
751 btc_out_point: bitcoin::OutPoint {
752 txid: tx_info.btc_transaction.compute_txid(),
753 vout: tx_info.out_idx,
754 },
755 },
756 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
757 _ => bail!("Non-final outcome in operation log"),
758 };
759
760 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
761 };
762
763 Ok(self.client_ctx.outcome_or_updates(&operation, operation_id, || {
764 let stream_rpc = self.rpc.clone();
765 let stream_cient_ctx = self.client_ctx.clone();
766 let stream_script_pub_key = address.assume_checked().script_pubkey();
767
768 stream! {
769 yield DepositStateV2::WaitingForTransaction;
770
771 retry(
772 "subscribe script history",
773 background_backoff(),
774 || stream_rpc.watch_script_history(&stream_script_pub_key)
775 ).await.expect("Will never give up");
776 let (btc_out_point, btc_deposited) = retry(
777 "fetch history",
778 background_backoff(),
779 || async {
780 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
781 history.first().and_then(|tx| {
782 let (out_idx, amount) = tx.output
783 .iter()
784 .enumerate()
785 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
786 let txid = tx.compute_txid();
787
788 Some((
789 bitcoin::OutPoint {
790 txid,
791 vout: out_idx as u32,
792 },
793 amount
794 ))
795 }).context("No deposit transaction found")
796 }
797 ).await.expect("Will never give up");
798
799 yield DepositStateV2::WaitingForConfirmation {
800 btc_deposited,
801 btc_out_point
802 };
803
804 let claim_data = stream_cient_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
805 peg_in_index: tweak_idx,
806 btc_out_point,
807 }).await;
808
809 yield DepositStateV2::Confirmed {
810 btc_deposited,
811 btc_out_point
812 };
813
814 match stream_cient_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
815 Ok(()) => yield DepositStateV2::Claimed {
816 btc_deposited,
817 btc_out_point
818 },
819 Err(e) => yield DepositStateV2::Failed(e.to_string())
820 }
821 }
822 }))
823 }
824
825 pub async fn find_tweak_idx_by_operation_id(
826 &self,
827 operation_id: OperationId,
828 ) -> anyhow::Result<TweakIdx> {
829 Ok(self
830 .client_ctx
831 .module_db()
832 .clone()
833 .begin_transaction_nc()
834 .await
835 .find_by_prefix(&PegInTweakIndexPrefix)
836 .await
837 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
838 .next()
839 .await
840 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
841 .0
842 .0)
843 }
844
845 pub async fn get_pegin_tweak_idx(
846 &self,
847 tweak_idx: TweakIdx,
848 ) -> anyhow::Result<PegInTweakIndexData> {
849 self.client_ctx
850 .module_db()
851 .clone()
852 .begin_transaction_nc()
853 .await
854 .get_value(&PegInTweakIndexKey(tweak_idx))
855 .await
856 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
857 }
858
859 pub async fn get_claimed_pegins(
860 &self,
861 dbtx: &mut DatabaseTransaction<'_>,
862 tweak_idx: TweakIdx,
863 ) -> Vec<(
864 bitcoin::OutPoint,
865 TransactionId,
866 Vec<fedimint_core::OutPoint>,
867 )> {
868 let outpoints = dbtx
869 .get_value(&PegInTweakIndexKey(tweak_idx))
870 .await
871 .map(|v| v.claimed)
872 .unwrap_or_default();
873
874 let mut res = vec![];
875
876 for outpoint in outpoints {
877 let claimed_peg_in_data = dbtx
878 .get_value(&ClaimedPegInKey {
879 peg_in_index: tweak_idx,
880 btc_out_point: outpoint,
881 })
882 .await
883 .expect("Must have a corresponding claim record");
884 res.push((
885 outpoint,
886 claimed_peg_in_data.claim_txid,
887 claimed_peg_in_data.change,
888 ));
889 }
890
891 res
892 }
893
894 pub async fn recheck_pegin_address_by_op_id(
896 &self,
897 operation_id: OperationId,
898 ) -> anyhow::Result<()> {
899 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
900
901 self.recheck_pegin_address(tweak_idx).await
902 }
903
904 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
906 self.db
907 .autocommit(
908 |dbtx, _| {
909 Box::pin(async {
910 let db_key = PegInTweakIndexKey(tweak_idx);
911 let db_val = dbtx
912 .get_value(&db_key)
913 .await
914 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
915
916 dbtx.insert_entry(
917 &db_key,
918 &PegInTweakIndexData {
919 next_check_time: Some(fedimint_core::time::now()),
920 ..db_val
921 },
922 )
923 .await;
924
925 let sender = self.pegin_monitor_wakeup_sender.clone();
926 dbtx.on_commit(move || {
927 let _ = sender.send(());
928 });
929
930 Ok::<_, anyhow::Error>(())
931 })
932 },
933 Some(100),
934 )
935 .await?;
936
937 Ok(())
938 }
939
940 pub async fn await_num_deposit_by_operation_id(
942 &self,
943 operation_id: OperationId,
944 num_deposits: usize,
945 ) -> anyhow::Result<()> {
946 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
947 self.await_num_deposits(tweak_idx, num_deposits).await
948 }
949
950 #[instrument(skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
951 pub async fn await_num_deposits(
952 &self,
953 tweak_idx: TweakIdx,
954 num_deposits: usize,
955 ) -> anyhow::Result<()> {
956 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
957
958 let mut receiver = self.pegin_claimed_receiver.clone();
959 let mut backoff = backoff_util::aggressive_backoff();
960
961 loop {
962 let pegins = self
963 .get_claimed_pegins(
964 &mut self.client_ctx.module_db().begin_transaction_nc().await,
965 tweak_idx,
966 )
967 .await;
968
969 if pegins.len() < num_deposits {
970 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
971 self.recheck_pegin_address(tweak_idx).await?;
972 runtime::sleep(backoff.next().unwrap_or_default()).await;
973 receiver.changed().await?;
974 continue;
975 }
976
977 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
978
979 for (_outpoint, transaction_id, change) in pegins {
980 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
981 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
982
983 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
984 bail!("{}", e);
985 }
986
987 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
988 self.client_ctx
989 .await_primary_module_outputs(operation_id, change)
990 .await
991 .expect("Cannot fail if tx was accepted and federation is honest");
992 }
993
994 return Ok(());
995 }
996 }
997
998 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1003 &self,
1004 address: bitcoin::Address<NetworkUnchecked>,
1005 amount: bitcoin::Amount,
1006 fee: PegOutFees,
1007 extra_meta: M,
1008 ) -> anyhow::Result<OperationId> {
1009 {
1010 let operation_id = OperationId(thread_rng().gen());
1011
1012 let withdraw_output =
1013 self.create_withdraw_output(operation_id, &address, amount, fee)?;
1014 let tx_builder = TransactionBuilder::new()
1015 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1016
1017 let extra_meta =
1018 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1019 self.client_ctx
1020 .finalize_and_submit_transaction(
1021 operation_id,
1022 WalletCommonInit::KIND.as_str(),
1023 |_, change| WalletOperationMeta {
1024 variant: WalletOperationMetaVariant::Withdraw {
1025 address: address.clone(),
1026 amount,
1027 fee,
1028 change,
1029 },
1030 extra_meta: extra_meta.clone(),
1031 },
1032 tx_builder,
1033 )
1034 .await?;
1035
1036 Ok(operation_id)
1037 }
1038 }
1039
1040 #[deprecated(
1045 since = "0.4.0",
1046 note = "RBF withdrawals are rejected by the federation"
1047 )]
1048 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1049 &self,
1050 rbf: Rbf,
1051 extra_meta: M,
1052 ) -> anyhow::Result<OperationId> {
1053 let operation_id = OperationId(thread_rng().gen());
1054
1055 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1056 let tx_builder = TransactionBuilder::new()
1057 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1058
1059 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1060 self.client_ctx
1061 .finalize_and_submit_transaction(
1062 operation_id,
1063 WalletCommonInit::KIND.as_str(),
1064 |_, change| WalletOperationMeta {
1065 variant: WalletOperationMetaVariant::RbfWithdraw {
1066 rbf: rbf.clone(),
1067 change,
1068 },
1069 extra_meta: extra_meta.clone(),
1070 },
1071 tx_builder,
1072 )
1073 .await?;
1074
1075 Ok(operation_id)
1076 }
1077
1078 pub async fn subscribe_withdraw_updates(
1079 &self,
1080 operation_id: OperationId,
1081 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1082 let operation = self
1083 .client_ctx
1084 .get_operation(operation_id)
1085 .await
1086 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1087
1088 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1089 bail!("Operation is not a wallet operation");
1090 }
1091
1092 let operation_meta = operation.meta::<WalletOperationMeta>();
1093
1094 let (WalletOperationMetaVariant::Withdraw { change, .. }
1095 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1096 else {
1097 bail!("Operation is not a withdraw operation");
1098 };
1099
1100 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1101 let client_ctx = self.client_ctx.clone();
1102
1103 Ok(self
1104 .client_ctx
1105 .outcome_or_updates(&operation, operation_id, || {
1106 stream! {
1107 match next_withdraw_state(&mut operation_stream).await {
1108 Some(WithdrawStates::Created(_)) => {
1109 yield WithdrawState::Created;
1110 },
1111 Some(s) => {
1112 panic!("Unexpected state {s:?}")
1113 },
1114 None => return,
1115 }
1116
1117 let _ = client_ctx
1122 .await_primary_module_outputs(operation_id, change)
1123 .await;
1124
1125
1126 match next_withdraw_state(&mut operation_stream).await {
1127 Some(WithdrawStates::Aborted(inner)) => {
1128 yield WithdrawState::Failed(inner.error);
1129 },
1130 Some(WithdrawStates::Success(inner)) => {
1131 yield WithdrawState::Succeeded(inner.txid);
1132 },
1133 Some(s) => {
1134 panic!("Unexpected state {s:?}")
1135 },
1136 None => {},
1137 }
1138 }
1139 }))
1140 }
1141}
1142
1143fn check_address(address: &Address<NetworkUnchecked>, network: Network) -> anyhow::Result<()> {
1144 ensure!(
1145 address.is_valid_for_network(network),
1146 "Address isn't compatible with the federation's network: {network:?}"
1147 );
1148
1149 Ok(())
1150}
1151
1152async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1154 let index = dbtx
1155 .get_value(&NextPegInTweakIndexKey)
1156 .await
1157 .unwrap_or_default();
1158 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1159 .await;
1160 index
1161}
1162
1163#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1164pub enum WalletClientStates {
1165 Deposit(DepositStateMachine),
1166 Withdraw(WithdrawStateMachine),
1167}
1168
1169impl IntoDynInstance for WalletClientStates {
1170 type DynType = DynState;
1171
1172 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1173 DynState::from_typed(instance_id, self)
1174 }
1175}
1176
1177impl State for WalletClientStates {
1178 type ModuleContext = WalletClientContext;
1179
1180 fn transitions(
1181 &self,
1182 context: &Self::ModuleContext,
1183 global_context: &DynGlobalClientContext,
1184 ) -> Vec<StateTransition<Self>> {
1185 match self {
1186 WalletClientStates::Deposit(sm) => {
1187 sm_enum_variant_translation!(
1188 sm.transitions(context, global_context),
1189 WalletClientStates::Deposit
1190 )
1191 }
1192 WalletClientStates::Withdraw(sm) => {
1193 sm_enum_variant_translation!(
1194 sm.transitions(context, global_context),
1195 WalletClientStates::Withdraw
1196 )
1197 }
1198 }
1199 }
1200
1201 fn operation_id(&self) -> OperationId {
1202 match self {
1203 WalletClientStates::Deposit(sm) => sm.operation_id(),
1204 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1205 }
1206 }
1207}
1208
1209#[cfg(all(test, not(target_family = "wasm")))]
1210mod tests {
1211 use std::collections::BTreeSet;
1212 use std::sync::atomic::{AtomicBool, Ordering};
1213
1214 use super::*;
1215 use crate::backup::{
1216 recover_scan_idxes_for_activity, RecoverScanOutcome, RECOVER_NUM_IDX_ADD_TO_LAST_USED,
1217 };
1218
1219 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1221 async fn sanity_test_recover_inner() {
1222 {
1223 let last_checked = AtomicBool::new(false);
1224 let last_checked = &last_checked;
1225 assert_eq!(
1226 recover_scan_idxes_for_activity(
1227 TweakIdx(0),
1228 &BTreeSet::new(),
1229 |cur_idx| async move {
1230 Ok(match cur_idx {
1231 TweakIdx(9) => {
1232 last_checked.store(true, Ordering::SeqCst);
1233 vec![]
1234 }
1235 TweakIdx(10) => panic!("Shouldn't happen"),
1236 TweakIdx(11) => {
1237 vec![0usize] }
1239 _ => vec![],
1240 })
1241 }
1242 )
1243 .await
1244 .unwrap(),
1245 RecoverScanOutcome {
1246 last_used_idx: None,
1247 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1248 tweak_idxes_with_pegins: BTreeSet::from([])
1249 }
1250 );
1251 assert!(last_checked.load(Ordering::SeqCst));
1252 }
1253
1254 {
1255 let last_checked = AtomicBool::new(false);
1256 let last_checked = &last_checked;
1257 assert_eq!(
1258 recover_scan_idxes_for_activity(
1259 TweakIdx(0),
1260 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1261 |cur_idx| async move {
1262 Ok(match cur_idx {
1263 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1264 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1265 TweakIdx(11) => {
1266 last_checked.store(true, Ordering::SeqCst);
1267 vec![]
1268 }
1269 TweakIdx(12) => panic!("Shouldn't happen"),
1270 TweakIdx(13) => {
1271 vec![0usize] }
1273 _ => vec![],
1274 })
1275 }
1276 )
1277 .await
1278 .unwrap(),
1279 RecoverScanOutcome {
1280 last_used_idx: Some(TweakIdx(2)),
1281 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1282 tweak_idxes_with_pegins: BTreeSet::from([])
1283 }
1284 );
1285 assert!(last_checked.load(Ordering::SeqCst));
1286 }
1287
1288 {
1289 let last_checked = AtomicBool::new(false);
1290 let last_checked = &last_checked;
1291 assert_eq!(
1292 recover_scan_idxes_for_activity(
1293 TweakIdx(10),
1294 &BTreeSet::new(),
1295 |cur_idx| async move {
1296 Ok(match cur_idx {
1297 TweakIdx(10) => vec![()],
1298 TweakIdx(19) => {
1299 last_checked.store(true, Ordering::SeqCst);
1300 vec![]
1301 }
1302 TweakIdx(20) => panic!("Shouldn't happen"),
1303 _ => vec![],
1304 })
1305 }
1306 )
1307 .await
1308 .unwrap(),
1309 RecoverScanOutcome {
1310 last_used_idx: Some(TweakIdx(10)),
1311 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1312 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1313 }
1314 );
1315 assert!(last_checked.load(Ordering::SeqCst));
1316 }
1317
1318 assert_eq!(
1319 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1320 Ok(match cur_idx {
1321 TweakIdx(6 | 15) => vec![()],
1322 _ => vec![],
1323 })
1324 })
1325 .await
1326 .unwrap(),
1327 RecoverScanOutcome {
1328 last_used_idx: Some(TweakIdx(15)),
1329 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1330 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1331 }
1332 );
1333 assert_eq!(
1334 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1335 Ok(match cur_idx {
1336 TweakIdx(8) => {
1337 vec![()] }
1339 TweakIdx(9) => {
1340 panic!("Shouldn't happen")
1341 }
1342 _ => vec![],
1343 })
1344 })
1345 .await
1346 .unwrap(),
1347 RecoverScanOutcome {
1348 last_used_idx: None,
1349 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1350 tweak_idxes_with_pegins: BTreeSet::from([])
1351 }
1352 );
1353 assert_eq!(
1354 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1355 Ok(match cur_idx {
1356 TweakIdx(9) => panic!("Shouldn't happen"),
1357 TweakIdx(15) => vec![()],
1358 _ => vec![],
1359 })
1360 })
1361 .await
1362 .unwrap(),
1363 RecoverScanOutcome {
1364 last_used_idx: Some(TweakIdx(15)),
1365 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1366 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1367 }
1368 );
1369 }
1370}