fedimint_wallet_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15/// Legacy, state-machine based peg-ins, replaced by `pegin_monitor`
16/// but retained for time being to ensure existing peg-ins complete.
17mod deposit;
18pub mod events;
19/// Peg-in monitor: a task monitoring deposit addresses for peg-ins.
20mod pegin_monitor;
21mod withdraw;
22
23use std::collections::BTreeMap;
24use std::future;
25use std::sync::Arc;
26use std::time::{Duration, SystemTime};
27
28use anyhow::{anyhow, bail, ensure, Context as AnyhowContext};
29use async_stream::stream;
30use backup::WalletModuleBackup;
31use bitcoin::address::NetworkUnchecked;
32use bitcoin::secp256k1::{All, Secp256k1, SECP256K1};
33use bitcoin::{Address, Network, ScriptBuf};
34use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
35use fedimint_api_client::api::{DynModuleApi, FederationResult};
36use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
37use fedimint_client::derivable_secret::{ChildId, DerivableSecret};
38use fedimint_client::module::init::{
39    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
42use fedimint_client::oplog::UpdateStreamOrOutcome;
43use fedimint_client::sm::util::MapStateTransitions;
44use fedimint_client::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
45use fedimint_client::transaction::{
46    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
47};
48use fedimint_client::{sm_enum_variant_translation, DynGlobalClientContext};
49use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
50use fedimint_core::db::{
51    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::{Decodable, Encodable};
54use fedimint_core::envs::{is_running_in_test_env, BitcoinRpcConfig};
55use fedimint_core::module::{
56    ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion, ModuleInit,
57    MultiApiVersion,
58};
59use fedimint_core::task::{sleep, MaybeSend, MaybeSync, TaskGroup};
60use fedimint_core::util::backoff_util::background_backoff;
61use fedimint_core::util::{backoff_util, retry};
62use fedimint_core::{
63    apply, async_trait_maybe_send, push_db_pair_items, runtime, secp256k1, Amount, OutPoint,
64    TransactionId,
65};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
68use fedimint_wallet_common::tweakable::Tweakable;
69pub use fedimint_wallet_common::*;
70use futures::{Stream, StreamExt};
71use rand::{thread_rng, Rng};
72use secp256k1::Keypair;
73use serde::{Deserialize, Serialize};
74use strum::IntoEnumIterator;
75use tokio::sync::watch;
76use tracing::{debug, instrument};
77
78use crate::api::WalletFederationApi;
79use crate::backup::WalletRecovery;
80use crate::client_db::{
81    ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
82    PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
83};
84use crate::deposit::DepositStateMachine;
85use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
86
87const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
88
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
90pub struct BitcoinTransactionData {
91    /// The bitcoin transaction is saved as soon as we see it so the transaction
92    /// can be re-transmitted if it's evicted from the mempool.
93    pub btc_transaction: bitcoin::Transaction,
94    /// Index of the deposit output
95    pub out_idx: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
99pub enum DepositStateV1 {
100    WaitingForTransaction,
101    WaitingForConfirmation(BitcoinTransactionData),
102    Confirmed(BitcoinTransactionData),
103    Claimed(BitcoinTransactionData),
104    Failed(String),
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
108pub enum DepositStateV2 {
109    WaitingForTransaction,
110    WaitingForConfirmation {
111        #[serde(with = "bitcoin::amount::serde::as_sat")]
112        btc_deposited: bitcoin::Amount,
113        btc_out_point: bitcoin::OutPoint,
114    },
115    Confirmed {
116        #[serde(with = "bitcoin::amount::serde::as_sat")]
117        btc_deposited: bitcoin::Amount,
118        btc_out_point: bitcoin::OutPoint,
119    },
120    Claimed {
121        #[serde(with = "bitcoin::amount::serde::as_sat")]
122        btc_deposited: bitcoin::Amount,
123        btc_out_point: bitcoin::OutPoint,
124    },
125    Failed(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub enum WithdrawState {
130    Created,
131    Succeeded(bitcoin::Txid),
132    Failed(String),
133    // TODO: track refund
134    // Refunded,
135    // RefundFailed(String),
136}
137
138async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
139where
140    S: Stream<Item = WalletClientStates> + Unpin,
141{
142    loop {
143        if let WalletClientStates::Withdraw(ds) = stream.next().await? {
144            return Some(ds.state);
145        }
146        tokio::task::yield_now().await;
147    }
148}
149
150#[derive(Debug, Clone, Default)]
151// TODO: should probably move to DB
152pub struct WalletClientInit(pub Option<BitcoinRpcConfig>);
153
154impl WalletClientInit {
155    pub fn new(rpc: BitcoinRpcConfig) -> Self {
156        Self(Some(rpc))
157    }
158}
159
160impl ModuleInit for WalletClientInit {
161    type Common = WalletCommonInit;
162
163    async fn dump_database(
164        &self,
165        dbtx: &mut DatabaseTransaction<'_>,
166        prefix_names: Vec<String>,
167    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
168        let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
169            BTreeMap::new();
170        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
171            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
172        });
173
174        for table in filtered_prefixes {
175            match table {
176                DbKeyPrefix::NextPegInTweakIndex => {
177                    if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
178                        wallet_client_items
179                            .insert("NextPegInTweakIndex".to_string(), Box::new(index));
180                    }
181                }
182                DbKeyPrefix::PegInTweakIndex => {
183                    push_db_pair_items!(
184                        dbtx,
185                        PegInTweakIndexPrefix,
186                        PegInTweakIndexKey,
187                        PegInTweakIndexData,
188                        wallet_client_items,
189                        "Peg-In Tweak Index"
190                    );
191                }
192                DbKeyPrefix::ClaimedPegIn => {
193                    push_db_pair_items!(
194                        dbtx,
195                        ClaimedPegInPrefix,
196                        ClaimedPegInKey,
197                        ClaimedPegInData,
198                        wallet_client_items,
199                        "Claimed Peg-In"
200                    );
201                }
202                DbKeyPrefix::RecoveryFinalized => {
203                    if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
204                        wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
205                    }
206                }
207                DbKeyPrefix::SupportsSafeDeposit => {
208                    push_db_pair_items!(
209                        dbtx,
210                        SupportsSafeDepositPrefix,
211                        SupportsSafeDepositKey,
212                        (),
213                        wallet_client_items,
214                        "Supports Safe Deposit"
215                    );
216                }
217                DbKeyPrefix::RecoveryState
218                | DbKeyPrefix::ExternalReservedStart
219                | DbKeyPrefix::CoreInternalReservedStart
220                | DbKeyPrefix::CoreInternalReservedEnd => {}
221            }
222        }
223
224        Box::new(wallet_client_items.into_iter())
225    }
226}
227
228#[apply(async_trait_maybe_send!)]
229impl ClientModuleInit for WalletClientInit {
230    type Module = WalletClientModule;
231
232    fn supported_api_versions(&self) -> MultiApiVersion {
233        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
234            .expect("no version conflicts")
235    }
236
237    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
238        let data = WalletClientModuleData {
239            cfg: args.cfg().clone(),
240            module_root_secret: args.module_root_secret().clone(),
241        };
242
243        let rpc_config = self
244            .0
245            .clone()
246            .unwrap_or(WalletClientModule::get_rpc_config(args.cfg()));
247
248        let db = args.db().clone();
249
250        let btc_rpc = create_bitcoind(&rpc_config)?;
251        let module_api = args.module_api().clone();
252
253        let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
254        let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
255
256        Ok(WalletClientModule {
257            db,
258            data,
259            module_api,
260            notifier: args.notifier().clone(),
261            rpc: btc_rpc,
262            client_ctx: args.context(),
263            pegin_monitor_wakeup_sender,
264            pegin_monitor_wakeup_receiver,
265            pegin_claimed_receiver,
266            pegin_claimed_sender,
267            task_group: args.task_group().clone(),
268            admin_auth: args.admin_auth().cloned(),
269        })
270    }
271
272    /// Wallet recovery
273    ///
274    /// Query bitcoin rpc for history of addresses from last known used
275    /// addresses (or index 0) until MAX_GAP unused ones.
276    ///
277    /// Notably does not persist the progress of addresses being queried,
278    /// because it is not expected that it would take long enough to bother.
279    async fn recover(
280        &self,
281        args: &ClientModuleRecoverArgs<Self>,
282        snapshot: Option<&<Self::Module as ClientModule>::Backup>,
283    ) -> anyhow::Result<()> {
284        args.recover_from_history::<WalletRecovery>(self, snapshot)
285            .await
286    }
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct WalletOperationMeta {
291    pub variant: WalletOperationMetaVariant,
292    pub extra_meta: serde_json::Value,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub enum WalletOperationMetaVariant {
298    Deposit {
299        address: Address<NetworkUnchecked>,
300        /// Added in 0.4.2, can be `None` for old deposits or `Some` for ones
301        /// using the pegin monitor. The value is the child index of the key
302        /// used to generate the address, so we can re-generate the secret key
303        /// from our root secret.
304        #[serde(default)]
305        tweak_idx: Option<TweakIdx>,
306        #[serde(default, skip_serializing_if = "Option::is_none")]
307        expires_at: Option<SystemTime>,
308    },
309    Withdraw {
310        address: Address<NetworkUnchecked>,
311        #[serde(with = "bitcoin::amount::serde::as_sat")]
312        amount: bitcoin::Amount,
313        fee: PegOutFees,
314        change: Vec<OutPoint>,
315    },
316
317    RbfWithdraw {
318        rbf: Rbf,
319        change: Vec<OutPoint>,
320    },
321}
322
323/// The non-resource, just plain-data parts of [`WalletClientModule`]
324#[derive(Debug, Clone)]
325pub struct WalletClientModuleData {
326    cfg: WalletClientConfig,
327    module_root_secret: DerivableSecret,
328}
329
330impl WalletClientModuleData {
331    fn derive_deposit_address(
332        &self,
333        idx: TweakIdx,
334    ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
335        let idx = ChildId(idx.0);
336
337        let secret_tweak_key = self
338            .module_root_secret
339            .child_key(WALLET_TWEAK_CHILD_ID)
340            .child_key(idx)
341            .to_secp_key(fedimint_core::secp256k1::SECP256K1);
342
343        let public_tweak_key = secret_tweak_key.public_key();
344
345        let address = self
346            .cfg
347            .peg_in_descriptor
348            .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
349            .address(self.cfg.network.0)
350            .unwrap();
351
352        // TODO: make hash?
353        let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
354
355        (secret_tweak_key, public_tweak_key, address, operation_id)
356    }
357
358    fn derive_peg_in_script(
359        &self,
360        idx: TweakIdx,
361    ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
362        let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
363
364        (
365            self.cfg
366                .peg_in_descriptor
367                .tweak(&secret_tweak_key.public_key(), SECP256K1)
368                .script_pubkey(),
369            address,
370            secret_tweak_key,
371            operation_id,
372        )
373    }
374}
375
376#[derive(Debug)]
377pub struct WalletClientModule {
378    data: WalletClientModuleData,
379    db: Database,
380    module_api: DynModuleApi,
381    notifier: ModuleNotifier<WalletClientStates>,
382    rpc: DynBitcoindRpc,
383    client_ctx: ClientContext<Self>,
384    /// Updated to wake up pegin monitor
385    pegin_monitor_wakeup_sender: watch::Sender<()>,
386    pegin_monitor_wakeup_receiver: watch::Receiver<()>,
387    /// Called every time a peg-in was claimed
388    pegin_claimed_sender: watch::Sender<()>,
389    pegin_claimed_receiver: watch::Receiver<()>,
390    task_group: TaskGroup,
391    admin_auth: Option<ApiAuth>,
392}
393
394#[apply(async_trait_maybe_send!)]
395impl ClientModule for WalletClientModule {
396    type Init = WalletClientInit;
397    type Common = WalletModuleTypes;
398    type Backup = WalletModuleBackup;
399    type ModuleStateMachineContext = WalletClientContext;
400    type States = WalletClientStates;
401
402    fn context(&self) -> Self::ModuleStateMachineContext {
403        WalletClientContext {
404            rpc: self.rpc.clone(),
405            wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
406            wallet_decoder: self.decoder(),
407            secp: Secp256k1::default(),
408            client_ctx: self.client_ctx.clone(),
409        }
410    }
411
412    async fn start(&self) {
413        self.task_group.spawn_cancellable("peg-in monitor", {
414            let client_ctx = self.client_ctx.clone();
415            let db = self.db.clone();
416            let btc_rpc = self.rpc.clone();
417            let module_api = self.module_api.clone();
418            let data = self.data.clone();
419            let pegin_claimed_sender = self.pegin_claimed_sender.clone();
420            let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
421            pegin_monitor::run_peg_in_monitor(
422                client_ctx,
423                db,
424                btc_rpc,
425                module_api,
426                data,
427                pegin_claimed_sender,
428                pegin_monitor_wakeup_receiver,
429            )
430        });
431
432        self.task_group
433            .spawn_cancellable("supports-safe-deposit-version", {
434                let db = self.db.clone();
435                let module_api = self.module_api.clone();
436
437                poll_supports_safe_deposit_version(db, module_api)
438            });
439    }
440
441    fn supports_backup(&self) -> bool {
442        true
443    }
444
445    async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
446        // fetch consensus height first
447        let session_count = self.client_ctx.global_api().session_count().await?;
448
449        let mut dbtx = self.db.begin_transaction_nc().await;
450        let next_pegin_tweak_idx = dbtx
451            .get_value(&NextPegInTweakIndexKey)
452            .await
453            .unwrap_or_default();
454        let claimed = dbtx
455            .find_by_prefix(&PegInTweakIndexPrefix)
456            .await
457            .filter_map(|(k, v)| async move {
458                if v.claimed.is_empty() {
459                    None
460                } else {
461                    Some(k.0)
462                }
463            })
464            .collect()
465            .await;
466        Ok(backup::WalletModuleBackup::new_v1(
467            session_count,
468            next_pegin_tweak_idx,
469            claimed,
470        ))
471    }
472
473    fn input_fee(
474        &self,
475        _amount: Amount,
476        _input: &<Self::Common as ModuleCommon>::Input,
477    ) -> Option<Amount> {
478        Some(self.cfg().fee_consensus.peg_in_abs)
479    }
480
481    fn output_fee(
482        &self,
483        _amount: Amount,
484        _output: &<Self::Common as ModuleCommon>::Output,
485    ) -> Option<Amount> {
486        Some(self.cfg().fee_consensus.peg_out_abs)
487    }
488
489    #[cfg(feature = "cli")]
490    async fn handle_cli_command(
491        &self,
492        args: &[std::ffi::OsString],
493    ) -> anyhow::Result<serde_json::Value> {
494        cli::handle_cli_command(self, args).await
495    }
496}
497
498#[derive(Debug, Clone)]
499pub struct WalletClientContext {
500    rpc: DynBitcoindRpc,
501    wallet_descriptor: PegInDescriptor,
502    wallet_decoder: Decoder,
503    secp: Secp256k1<All>,
504    pub client_ctx: ClientContext<WalletClientModule>,
505}
506
507impl Context for WalletClientContext {
508    const KIND: Option<ModuleKind> = Some(KIND);
509}
510
511impl WalletClientModule {
512    fn cfg(&self) -> &WalletClientConfig {
513        &self.data.cfg
514    }
515
516    fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
517        if let Ok(rpc_config) = BitcoinRpcConfig::get_defaults_from_env_vars() {
518            // TODO: Wallet client cannot support bitcoind RPC until the bitcoin dep is
519            // updated to 0.30
520            if rpc_config.kind == "bitcoind" {
521                cfg.default_bitcoin_rpc.clone()
522            } else {
523                rpc_config
524            }
525        } else {
526            cfg.default_bitcoin_rpc.clone()
527        }
528    }
529
530    pub fn get_network(&self) -> Network {
531        self.cfg().network.0
532    }
533
534    pub fn get_fee_consensus(&self) -> FeeConsensus {
535        self.cfg().fee_consensus
536    }
537
538    async fn allocate_deposit_address_inner(
539        &self,
540        dbtx: &mut DatabaseTransaction<'_>,
541    ) -> (OperationId, Address, TweakIdx) {
542        dbtx.ensure_isolated().expect("Must be isolated db");
543
544        let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
545        let (_secret_tweak_key, _, address, operation_id) =
546            self.data.derive_deposit_address(tweak_idx);
547
548        let now = fedimint_core::time::now();
549
550        dbtx.insert_new_entry(
551            &PegInTweakIndexKey(tweak_idx),
552            &PegInTweakIndexData {
553                creation_time: now,
554                next_check_time: Some(now),
555                last_check_time: None,
556                operation_id,
557                claimed: vec![],
558            },
559        )
560        .await;
561
562        (operation_id, address, tweak_idx)
563    }
564
565    /// Fetches the fees that would need to be paid to make the withdraw request
566    /// using [`Self::withdraw`] work *right now*.
567    ///
568    /// Note that we do not receive a guarantee that these fees will be valid in
569    /// the future, thus even the next second using these fees *may* fail.
570    /// The caller should be prepared to retry with a new fee estimate.
571    pub async fn get_withdraw_fees(
572        &self,
573        address: &bitcoin::Address,
574        amount: bitcoin::Amount,
575    ) -> anyhow::Result<PegOutFees> {
576        self.module_api
577            .fetch_peg_out_fees(address, amount)
578            .await?
579            .context("Federation didn't return peg-out fees")
580    }
581
582    /// Returns a summary of the wallet's coins
583    pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
584        Ok(self.module_api.fetch_wallet_summary().await?)
585    }
586
587    pub fn create_withdraw_output(
588        &self,
589        operation_id: OperationId,
590        address: bitcoin::Address,
591        amount: bitcoin::Amount,
592        fees: PegOutFees,
593    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
594        let output = WalletOutput::new_v0_peg_out(address, amount, fees);
595
596        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
597
598        let sm_gen = move |out_point_range: OutPointRange| {
599            assert_eq!(out_point_range.count(), 1);
600            let out_idx = out_point_range.start_idx();
601            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
602                operation_id,
603                state: WithdrawStates::Created(CreatedWithdrawState {
604                    fm_outpoint: OutPoint {
605                        txid: out_point_range.txid(),
606                        out_idx,
607                    },
608                }),
609            })]
610        };
611
612        Ok(ClientOutputBundle::new(
613            vec![ClientOutput::<WalletOutput> { output, amount }],
614            vec![ClientOutputSM::<WalletClientStates> {
615                state_machines: Arc::new(sm_gen),
616            }],
617        ))
618    }
619
620    pub fn create_rbf_withdraw_output(
621        &self,
622        operation_id: OperationId,
623        rbf: &Rbf,
624    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
625        let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
626
627        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
628
629        let sm_gen = move |out_point_range: OutPointRange| {
630            assert_eq!(out_point_range.count(), 1);
631            let out_idx = out_point_range.start_idx();
632            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
633                operation_id,
634                state: WithdrawStates::Created(CreatedWithdrawState {
635                    fm_outpoint: OutPoint {
636                        txid: out_point_range.txid(),
637                        out_idx,
638                    },
639                }),
640            })]
641        };
642
643        Ok(ClientOutputBundle::new(
644            vec![ClientOutput::<WalletOutput> { output, amount }],
645            vec![ClientOutputSM::<WalletClientStates> {
646                state_machines: Arc::new(sm_gen),
647            }],
648        ))
649    }
650
651    pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
652        Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
653    }
654
655    /// Returns true if the federation's wallet module consensus version
656    /// supports processing all deposits.
657    ///
658    /// This method is safe to call offline, since it first attempts to read a
659    /// key from the db that represents the client has previously been able to
660    /// verify the wallet module consensus version. If the client has not
661    /// verified the version, it must be online to fetch the latest wallet
662    /// module consensus version.
663    pub async fn supports_safe_deposit(&self) -> bool {
664        let mut dbtx = self.db.begin_transaction().await;
665
666        let already_verified_supports_safe_deposit =
667            dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
668
669        already_verified_supports_safe_deposit || {
670            match self.module_api.module_consensus_version().await {
671                Ok(module_consensus_version) => {
672                    let supported_version =
673                        SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
674
675                    if supported_version {
676                        dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
677                        dbtx.commit_tx().await;
678                    }
679
680                    supported_version
681                }
682                Err(_) => false,
683            }
684        }
685    }
686
687    /// Allocates a deposit address controlled by the federation, guaranteeing
688    /// safe handling of all deposits, including on-chain transactions exceeding
689    /// `ALEPH_BFT_UNIT_BYTE_LIMIT`.
690    ///
691    /// Returns an error if the client has never been online to verify the
692    /// federation's wallet module consensus version supports processing all
693    /// deposits.
694    pub async fn safe_allocate_deposit_address<M>(
695        &self,
696        extra_meta: M,
697    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
698    where
699        M: Serialize + MaybeSend + MaybeSync,
700    {
701        ensure!(
702            self.supports_safe_deposit().await,
703            "Wallet module consensus version doesn't support safe deposits",
704        );
705
706        self.allocate_deposit_address_expert_only(extra_meta).await
707    }
708
709    /// Allocates a deposit address that is controlled by the federation.
710    ///
711    /// This is an EXPERT ONLY method intended for power users such as Lightning
712    /// gateways allocating liquidity, and we discourage exposing peg-in
713    /// functionality to everyday users of a Fedimint wallet due to the
714    /// following two limitations:
715    ///
716    /// The transaction sending to this address needs to be smaller than 40KB in
717    /// order for the peg-in to be claimable. If the transaction is too large,
718    /// funds will be lost.
719    ///
720    /// In the future, federations will also enforce a minimum peg-in amount to
721    /// prevent accumulation of dust UTXOs. Peg-ins under this minimum cannot be
722    /// claimed and funds will be lost.
723    ///
724    /// Everyday users should rely on Lightning to move funds into the
725    /// federation.
726    pub async fn allocate_deposit_address_expert_only<M>(
727        &self,
728        extra_meta: M,
729    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
730    where
731        M: Serialize + MaybeSend + MaybeSync,
732    {
733        let extra_meta_value =
734            serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
735        let (operation_id, address, tweak_idx) = self
736            .db
737            .autocommit(
738                move |dbtx, _| {
739                    let extra_meta_value_inner = extra_meta_value.clone();
740                    Box::pin(async move {
741                        let (operation_id, address, tweak_idx) = self
742                            .allocate_deposit_address_inner(dbtx)
743                            .await;
744
745                        self.client_ctx.manual_operation_start_dbtx(
746                            dbtx,
747                            operation_id,
748                            WalletCommonInit::KIND.as_str(),
749                            WalletOperationMeta {
750                                variant: WalletOperationMetaVariant::Deposit {
751                                    address: address.clone().into_unchecked(),
752                                    tweak_idx: Some(tweak_idx),
753                                    expires_at: None,
754                                },
755                                extra_meta: extra_meta_value_inner,
756                            },
757                            vec![]
758                        ).await?;
759
760                        debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
761
762                        // Begin watching the script address
763                        self.rpc
764                            .watch_script_history(&address.script_pubkey())
765                            .await?;
766
767                        let sender = self.pegin_monitor_wakeup_sender.clone();
768                        dbtx.on_commit(move || {
769                            let _ = sender.send(());
770                        });
771
772                        Ok((operation_id, address, tweak_idx))
773                    })
774                },
775                Some(100),
776            )
777            .await
778            .map_err(|e| match e {
779                AutocommitError::CommitFailed {
780                    last_error,
781                    attempts,
782                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
783                AutocommitError::ClosureError { error, .. } => error,
784            })?;
785
786        Ok((operation_id, address, tweak_idx))
787    }
788
789    /// Returns a stream of updates about an ongoing deposit operation created
790    /// with [`WalletClientModule::allocate_deposit_address_expert_only`].
791    /// Returns an error for old deposit operations created prior to the 0.4
792    /// release and not driven to completion yet. This should be rare enough
793    /// that an indeterminate state is ok here.
794    pub async fn subscribe_deposit(
795        &self,
796        operation_id: OperationId,
797    ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
798        let operation = self
799            .client_ctx
800            .get_operation(operation_id)
801            .await
802            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
803
804        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
805            bail!("Operation is not a wallet operation");
806        }
807
808        let operation_meta = operation.meta::<WalletOperationMeta>();
809
810        let WalletOperationMetaVariant::Deposit {
811            address, tweak_idx, ..
812        } = operation_meta.variant
813        else {
814            bail!("Operation is not a deposit operation");
815        };
816
817        let address = address.require_network(self.cfg().network.0)?;
818
819        // The old deposit operations don't have tweak_idx set
820        let Some(tweak_idx) = tweak_idx else {
821            // In case we are dealing with an old deposit that still uses state machines we
822            // don't have the logic here anymore to subscribe to updates. We can still read
823            // the final state though if it reached any.
824            let outcome_v1 = operation
825                .outcome::<DepositStateV1>()
826                .context("Old pending deposit, can't subscribe to updates")?;
827
828            let outcome_v2 = match outcome_v1 {
829                DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
830                    btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
831                    btc_out_point: bitcoin::OutPoint {
832                        txid: tx_info.btc_transaction.compute_txid(),
833                        vout: tx_info.out_idx,
834                    },
835                },
836                DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
837                _ => bail!("Non-final outcome in operation log"),
838            };
839
840            return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
841        };
842
843        Ok(self.client_ctx.outcome_or_updates(&operation, operation_id, || {
844            let stream_rpc = self.rpc.clone();
845            let stream_client_ctx = self.client_ctx.clone();
846            let stream_script_pub_key = address.script_pubkey();
847
848            stream! {
849                yield DepositStateV2::WaitingForTransaction;
850
851                retry(
852                    "subscribe script history",
853                    background_backoff(),
854                    || stream_rpc.watch_script_history(&stream_script_pub_key)
855                ).await.expect("Will never give up");
856                let (btc_out_point, btc_deposited) = retry(
857                    "fetch history",
858                    background_backoff(),
859                    || async {
860                        let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
861                        history.first().and_then(|tx| {
862                            let (out_idx, amount) = tx.output
863                                .iter()
864                                .enumerate()
865                                .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
866                            let txid = tx.compute_txid();
867
868                            Some((
869                                bitcoin::OutPoint {
870                                    txid,
871                                    vout: out_idx as u32,
872                                },
873                                amount
874                            ))
875                        }).context("No deposit transaction found")
876                    }
877                ).await.expect("Will never give up");
878
879                yield DepositStateV2::WaitingForConfirmation {
880                    btc_deposited,
881                    btc_out_point
882                };
883
884                let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
885                    peg_in_index: tweak_idx,
886                    btc_out_point,
887                }).await;
888
889                yield DepositStateV2::Confirmed {
890                    btc_deposited,
891                    btc_out_point
892                };
893
894                match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
895                    Ok(()) => yield DepositStateV2::Claimed {
896                        btc_deposited,
897                        btc_out_point
898                    },
899                    Err(e) => yield DepositStateV2::Failed(e.to_string())
900                }
901            }
902        }))
903    }
904
905    pub async fn find_tweak_idx_by_address(
906        &self,
907        address: bitcoin::Address<NetworkUnchecked>,
908    ) -> anyhow::Result<TweakIdx> {
909        let data = self.data.clone();
910        let Some((tweak_idx, _)) = self
911            .db
912            .begin_transaction_nc()
913            .await
914            .find_by_prefix(&PegInTweakIndexPrefix)
915            .await
916            .filter(|(k, _)| {
917                let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
918                future::ready(derived_address.into_unchecked() == address)
919            })
920            .next()
921            .await
922        else {
923            bail!("Address not found in the list of derived keys");
924        };
925
926        Ok(tweak_idx.0)
927    }
928    pub async fn find_tweak_idx_by_operation_id(
929        &self,
930        operation_id: OperationId,
931    ) -> anyhow::Result<TweakIdx> {
932        Ok(self
933            .client_ctx
934            .module_db()
935            .clone()
936            .begin_transaction_nc()
937            .await
938            .find_by_prefix(&PegInTweakIndexPrefix)
939            .await
940            .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
941            .next()
942            .await
943            .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
944            .0
945             .0)
946    }
947
948    pub async fn get_pegin_tweak_idx(
949        &self,
950        tweak_idx: TweakIdx,
951    ) -> anyhow::Result<PegInTweakIndexData> {
952        self.client_ctx
953            .module_db()
954            .clone()
955            .begin_transaction_nc()
956            .await
957            .get_value(&PegInTweakIndexKey(tweak_idx))
958            .await
959            .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
960    }
961
962    pub async fn get_claimed_pegins(
963        &self,
964        dbtx: &mut DatabaseTransaction<'_>,
965        tweak_idx: TweakIdx,
966    ) -> Vec<(
967        bitcoin::OutPoint,
968        TransactionId,
969        Vec<fedimint_core::OutPoint>,
970    )> {
971        let outpoints = dbtx
972            .get_value(&PegInTweakIndexKey(tweak_idx))
973            .await
974            .map(|v| v.claimed)
975            .unwrap_or_default();
976
977        let mut res = vec![];
978
979        for outpoint in outpoints {
980            let claimed_peg_in_data = dbtx
981                .get_value(&ClaimedPegInKey {
982                    peg_in_index: tweak_idx,
983                    btc_out_point: outpoint,
984                })
985                .await
986                .expect("Must have a corresponding claim record");
987            res.push((
988                outpoint,
989                claimed_peg_in_data.claim_txid,
990                claimed_peg_in_data.change,
991            ));
992        }
993
994        res
995    }
996
997    /// Like [`Self::recheck_pegin_address`] but by `operation_id`
998    pub async fn recheck_pegin_address_by_op_id(
999        &self,
1000        operation_id: OperationId,
1001    ) -> anyhow::Result<()> {
1002        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1003
1004        self.recheck_pegin_address(tweak_idx).await
1005    }
1006
1007    /// Schedule given address for immediate re-check for deposits
1008    pub async fn recheck_pegin_address_by_address(
1009        &self,
1010        address: bitcoin::Address<NetworkUnchecked>,
1011    ) -> anyhow::Result<()> {
1012        self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1013            .await
1014    }
1015
1016    /// Schedule given address for immediate re-check for deposits
1017    pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1018        self.db
1019            .autocommit(
1020                |dbtx, _| {
1021                    Box::pin(async {
1022                        let db_key = PegInTweakIndexKey(tweak_idx);
1023                        let db_val = dbtx
1024                            .get_value(&db_key)
1025                            .await
1026                            .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1027
1028                        dbtx.insert_entry(
1029                            &db_key,
1030                            &PegInTweakIndexData {
1031                                next_check_time: Some(fedimint_core::time::now()),
1032                                ..db_val
1033                            },
1034                        )
1035                        .await;
1036
1037                        let sender = self.pegin_monitor_wakeup_sender.clone();
1038                        dbtx.on_commit(move || {
1039                            let _ = sender.send(());
1040                        });
1041
1042                        Ok::<_, anyhow::Error>(())
1043                    })
1044                },
1045                Some(100),
1046            )
1047            .await?;
1048
1049        Ok(())
1050    }
1051
1052    /// Await for num deposit by [`OperationId`]
1053    pub async fn await_num_deposits_by_operation_id(
1054        &self,
1055        operation_id: OperationId,
1056        num_deposits: usize,
1057    ) -> anyhow::Result<()> {
1058        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1059        self.await_num_deposits(tweak_idx, num_deposits).await
1060    }
1061
1062    pub async fn await_num_deposits_by_address(
1063        &self,
1064        address: bitcoin::Address<NetworkUnchecked>,
1065        num_deposits: usize,
1066    ) -> anyhow::Result<()> {
1067        self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1068            .await
1069    }
1070
1071    #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1072    pub async fn await_num_deposits(
1073        &self,
1074        tweak_idx: TweakIdx,
1075        num_deposits: usize,
1076    ) -> anyhow::Result<()> {
1077        let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1078
1079        let mut receiver = self.pegin_claimed_receiver.clone();
1080        let mut backoff = backoff_util::aggressive_backoff();
1081
1082        loop {
1083            let pegins = self
1084                .get_claimed_pegins(
1085                    &mut self.client_ctx.module_db().begin_transaction_nc().await,
1086                    tweak_idx,
1087                )
1088                .await;
1089
1090            if pegins.len() < num_deposits {
1091                debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1092                self.recheck_pegin_address(tweak_idx).await?;
1093                runtime::sleep(backoff.next().unwrap_or_default()).await;
1094                receiver.changed().await?;
1095                continue;
1096            }
1097
1098            debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1099
1100            for (_outpoint, transaction_id, change) in pegins {
1101                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1102                let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1103
1104                if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1105                    bail!("{}", e);
1106                }
1107
1108                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1109                self.client_ctx
1110                    .await_primary_module_outputs(operation_id, change)
1111                    .await
1112                    .expect("Cannot fail if tx was accepted and federation is honest");
1113            }
1114
1115            return Ok(());
1116        }
1117    }
1118
1119    /// Attempt to withdraw a given `amount` of Bitcoin to a destination
1120    /// `address`. The caller has to supply the fee rate to be used which can be
1121    /// fetched using [`Self::get_withdraw_fees`] and should be
1122    /// acknowledged by the user since it can be unexpectedly high.
1123    pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1124        &self,
1125        address: &bitcoin::Address,
1126        amount: bitcoin::Amount,
1127        fee: PegOutFees,
1128        extra_meta: M,
1129    ) -> anyhow::Result<OperationId> {
1130        {
1131            let operation_id = OperationId(thread_rng().gen());
1132
1133            let withdraw_output =
1134                self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1135            let tx_builder = TransactionBuilder::new()
1136                .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1137
1138            let extra_meta =
1139                serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1140            self.client_ctx
1141                .finalize_and_submit_transaction(
1142                    operation_id,
1143                    WalletCommonInit::KIND.as_str(),
1144                    {
1145                        let address = address.clone();
1146                        move |change_range: OutPointRange| WalletOperationMeta {
1147                            variant: WalletOperationMetaVariant::Withdraw {
1148                                address: address.clone().into_unchecked(),
1149                                amount,
1150                                fee,
1151                                change: change_range.into_iter().collect(),
1152                            },
1153                            extra_meta: extra_meta.clone(),
1154                        }
1155                    },
1156                    tx_builder,
1157                )
1158                .await?;
1159
1160            Ok(operation_id)
1161        }
1162    }
1163
1164    /// Attempt to increase the fee of a onchain withdraw transaction using
1165    /// replace by fee (RBF).
1166    /// This can prevent transactions from getting stuck
1167    /// in the mempool
1168    #[deprecated(
1169        since = "0.4.0",
1170        note = "RBF withdrawals are rejected by the federation"
1171    )]
1172    pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1173        &self,
1174        rbf: Rbf,
1175        extra_meta: M,
1176    ) -> anyhow::Result<OperationId> {
1177        let operation_id = OperationId(thread_rng().gen());
1178
1179        let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1180        let tx_builder = TransactionBuilder::new()
1181            .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1182
1183        let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1184        self.client_ctx
1185            .finalize_and_submit_transaction(
1186                operation_id,
1187                WalletCommonInit::KIND.as_str(),
1188                move |change_range: OutPointRange| WalletOperationMeta {
1189                    variant: WalletOperationMetaVariant::RbfWithdraw {
1190                        rbf: rbf.clone(),
1191                        change: change_range.into_iter().collect(),
1192                    },
1193                    extra_meta: extra_meta.clone(),
1194                },
1195                tx_builder,
1196            )
1197            .await?;
1198
1199        Ok(operation_id)
1200    }
1201
1202    pub async fn subscribe_withdraw_updates(
1203        &self,
1204        operation_id: OperationId,
1205    ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1206        let operation = self
1207            .client_ctx
1208            .get_operation(operation_id)
1209            .await
1210            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1211
1212        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1213            bail!("Operation is not a wallet operation");
1214        }
1215
1216        let operation_meta = operation.meta::<WalletOperationMeta>();
1217
1218        let (WalletOperationMetaVariant::Withdraw { change, .. }
1219        | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1220        else {
1221            bail!("Operation is not a withdraw operation");
1222        };
1223
1224        let mut operation_stream = self.notifier.subscribe(operation_id).await;
1225        let client_ctx = self.client_ctx.clone();
1226
1227        Ok(self
1228            .client_ctx
1229            .outcome_or_updates(&operation, operation_id, || {
1230                stream! {
1231                    match next_withdraw_state(&mut operation_stream).await {
1232                        Some(WithdrawStates::Created(_)) => {
1233                            yield WithdrawState::Created;
1234                        },
1235                        Some(s) => {
1236                            panic!("Unexpected state {s:?}")
1237                        },
1238                        None => return,
1239                    }
1240
1241                    // TODO: get rid of awaiting change here, there has to be a better way to make tests deterministic
1242
1243                        // Swallowing potential errors since the transaction failing  is handled by
1244                        // output outcome fetching already
1245                        let _ = client_ctx
1246                            .await_primary_module_outputs(operation_id, change)
1247                            .await;
1248
1249
1250                    match next_withdraw_state(&mut operation_stream).await {
1251                        Some(WithdrawStates::Aborted(inner)) => {
1252                            yield WithdrawState::Failed(inner.error);
1253                        },
1254                        Some(WithdrawStates::Success(inner)) => {
1255                            yield WithdrawState::Succeeded(inner.txid);
1256                        },
1257                        Some(s) => {
1258                            panic!("Unexpected state {s:?}")
1259                        },
1260                        None => {},
1261                    }
1262                }
1263            }))
1264    }
1265
1266    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1267        self.admin_auth
1268            .clone()
1269            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1270    }
1271
1272    pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1273        self.module_api
1274            .activate_consensus_version_voting(self.admin_auth()?)
1275            .await?;
1276
1277        Ok(())
1278    }
1279}
1280
1281/// Polls the federation checking if the activated module consensus version
1282/// supports safe deposits, saving the result in the db once it does.
1283async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1284    loop {
1285        let mut dbtx = db.begin_transaction().await;
1286
1287        if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1288            break;
1289        }
1290
1291        if let Ok(module_consensus_version) = module_api.module_consensus_version().await {
1292            if SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version {
1293                dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1294                dbtx.commit_tx().await;
1295                break;
1296            }
1297        }
1298
1299        drop(dbtx);
1300
1301        if is_running_in_test_env() {
1302            sleep(Duration::from_secs(1)).await;
1303        } else {
1304            sleep(Duration::from_secs(3600)).await;
1305        }
1306    }
1307}
1308
1309/// Returns the child index to derive the next peg-in tweak key from.
1310async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1311    let index = dbtx
1312        .get_value(&NextPegInTweakIndexKey)
1313        .await
1314        .unwrap_or_default();
1315    dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1316        .await;
1317    index
1318}
1319
1320#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1321pub enum WalletClientStates {
1322    Deposit(DepositStateMachine),
1323    Withdraw(WithdrawStateMachine),
1324}
1325
1326impl IntoDynInstance for WalletClientStates {
1327    type DynType = DynState;
1328
1329    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1330        DynState::from_typed(instance_id, self)
1331    }
1332}
1333
1334impl State for WalletClientStates {
1335    type ModuleContext = WalletClientContext;
1336
1337    fn transitions(
1338        &self,
1339        context: &Self::ModuleContext,
1340        global_context: &DynGlobalClientContext,
1341    ) -> Vec<StateTransition<Self>> {
1342        match self {
1343            WalletClientStates::Deposit(sm) => {
1344                sm_enum_variant_translation!(
1345                    sm.transitions(context, global_context),
1346                    WalletClientStates::Deposit
1347                )
1348            }
1349            WalletClientStates::Withdraw(sm) => {
1350                sm_enum_variant_translation!(
1351                    sm.transitions(context, global_context),
1352                    WalletClientStates::Withdraw
1353                )
1354            }
1355        }
1356    }
1357
1358    fn operation_id(&self) -> OperationId {
1359        match self {
1360            WalletClientStates::Deposit(sm) => sm.operation_id(),
1361            WalletClientStates::Withdraw(sm) => sm.operation_id(),
1362        }
1363    }
1364}
1365
1366#[cfg(all(test, not(target_family = "wasm")))]
1367mod tests {
1368    use std::collections::BTreeSet;
1369    use std::sync::atomic::{AtomicBool, Ordering};
1370
1371    use super::*;
1372    use crate::backup::{
1373        recover_scan_idxes_for_activity, RecoverScanOutcome, RECOVER_NUM_IDX_ADD_TO_LAST_USED,
1374    };
1375
1376    #[allow(clippy::too_many_lines)] // shut-up clippy, it's a test
1377    #[tokio::test(flavor = "multi_thread")]
1378    async fn sanity_test_recover_inner() {
1379        {
1380            let last_checked = AtomicBool::new(false);
1381            let last_checked = &last_checked;
1382            assert_eq!(
1383                recover_scan_idxes_for_activity(
1384                    TweakIdx(0),
1385                    &BTreeSet::new(),
1386                    |cur_idx| async move {
1387                        Ok(match cur_idx {
1388                            TweakIdx(9) => {
1389                                last_checked.store(true, Ordering::SeqCst);
1390                                vec![]
1391                            }
1392                            TweakIdx(10) => panic!("Shouldn't happen"),
1393                            TweakIdx(11) => {
1394                                vec![0usize] /* just for type inference */
1395                            }
1396                            _ => vec![],
1397                        })
1398                    }
1399                )
1400                .await
1401                .unwrap(),
1402                RecoverScanOutcome {
1403                    last_used_idx: None,
1404                    new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1405                    tweak_idxes_with_pegins: BTreeSet::from([])
1406                }
1407            );
1408            assert!(last_checked.load(Ordering::SeqCst));
1409        }
1410
1411        {
1412            let last_checked = AtomicBool::new(false);
1413            let last_checked = &last_checked;
1414            assert_eq!(
1415                recover_scan_idxes_for_activity(
1416                    TweakIdx(0),
1417                    &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1418                    |cur_idx| async move {
1419                        Ok(match cur_idx {
1420                            TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1421                            TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1422                            TweakIdx(11) => {
1423                                last_checked.store(true, Ordering::SeqCst);
1424                                vec![]
1425                            }
1426                            TweakIdx(12) => panic!("Shouldn't happen"),
1427                            TweakIdx(13) => {
1428                                vec![0usize] /* just for type inference */
1429                            }
1430                            _ => vec![],
1431                        })
1432                    }
1433                )
1434                .await
1435                .unwrap(),
1436                RecoverScanOutcome {
1437                    last_used_idx: Some(TweakIdx(2)),
1438                    new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1439                    tweak_idxes_with_pegins: BTreeSet::from([])
1440                }
1441            );
1442            assert!(last_checked.load(Ordering::SeqCst));
1443        }
1444
1445        {
1446            let last_checked = AtomicBool::new(false);
1447            let last_checked = &last_checked;
1448            assert_eq!(
1449                recover_scan_idxes_for_activity(
1450                    TweakIdx(10),
1451                    &BTreeSet::new(),
1452                    |cur_idx| async move {
1453                        Ok(match cur_idx {
1454                            TweakIdx(10) => vec![()],
1455                            TweakIdx(19) => {
1456                                last_checked.store(true, Ordering::SeqCst);
1457                                vec![]
1458                            }
1459                            TweakIdx(20) => panic!("Shouldn't happen"),
1460                            _ => vec![],
1461                        })
1462                    }
1463                )
1464                .await
1465                .unwrap(),
1466                RecoverScanOutcome {
1467                    last_used_idx: Some(TweakIdx(10)),
1468                    new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1469                    tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1470                }
1471            );
1472            assert!(last_checked.load(Ordering::SeqCst));
1473        }
1474
1475        assert_eq!(
1476            recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1477                Ok(match cur_idx {
1478                    TweakIdx(6 | 15) => vec![()],
1479                    _ => vec![],
1480                })
1481            })
1482            .await
1483            .unwrap(),
1484            RecoverScanOutcome {
1485                last_used_idx: Some(TweakIdx(15)),
1486                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1487                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1488            }
1489        );
1490        assert_eq!(
1491            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1492                Ok(match cur_idx {
1493                    TweakIdx(8) => {
1494                        vec![()] /* for type inference only */
1495                    }
1496                    TweakIdx(9) => {
1497                        panic!("Shouldn't happen")
1498                    }
1499                    _ => vec![],
1500                })
1501            })
1502            .await
1503            .unwrap(),
1504            RecoverScanOutcome {
1505                last_used_idx: None,
1506                new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1507                tweak_idxes_with_pegins: BTreeSet::from([])
1508            }
1509        );
1510        assert_eq!(
1511            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1512                Ok(match cur_idx {
1513                    TweakIdx(9) => panic!("Shouldn't happen"),
1514                    TweakIdx(15) => vec![()],
1515                    _ => vec![],
1516                })
1517            })
1518            .await
1519            .unwrap(),
1520            RecoverScanOutcome {
1521                last_used_idx: Some(TweakIdx(15)),
1522                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1523                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1524            }
1525        );
1526    }
1527}