1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18pub mod events;
19mod pegin_monitor;
21mod withdraw;
22
23use std::collections::BTreeMap;
24use std::future;
25use std::sync::Arc;
26use std::time::{Duration, SystemTime};
27
28use anyhow::{anyhow, bail, ensure, Context as AnyhowContext};
29use async_stream::stream;
30use backup::WalletModuleBackup;
31use bitcoin::address::NetworkUnchecked;
32use bitcoin::secp256k1::{All, Secp256k1, SECP256K1};
33use bitcoin::{Address, Network, ScriptBuf};
34use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
35use fedimint_api_client::api::{DynModuleApi, FederationResult};
36use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
37use fedimint_client::derivable_secret::{ChildId, DerivableSecret};
38use fedimint_client::module::init::{
39 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
42use fedimint_client::oplog::UpdateStreamOrOutcome;
43use fedimint_client::sm::util::MapStateTransitions;
44use fedimint_client::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
45use fedimint_client::transaction::{
46 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
47};
48use fedimint_client::{sm_enum_variant_translation, DynGlobalClientContext};
49use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
50use fedimint_core::db::{
51 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::{Decodable, Encodable};
54use fedimint_core::envs::{is_running_in_test_env, BitcoinRpcConfig};
55use fedimint_core::module::{
56 ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion, ModuleInit,
57 MultiApiVersion,
58};
59use fedimint_core::task::{sleep, MaybeSend, MaybeSync, TaskGroup};
60use fedimint_core::util::backoff_util::background_backoff;
61use fedimint_core::util::{backoff_util, retry};
62use fedimint_core::{
63 apply, async_trait_maybe_send, push_db_pair_items, runtime, secp256k1, Amount, OutPoint,
64 TransactionId,
65};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
68use fedimint_wallet_common::tweakable::Tweakable;
69pub use fedimint_wallet_common::*;
70use futures::{Stream, StreamExt};
71use rand::{thread_rng, Rng};
72use secp256k1::Keypair;
73use serde::{Deserialize, Serialize};
74use strum::IntoEnumIterator;
75use tokio::sync::watch;
76use tracing::{debug, instrument};
77
78use crate::api::WalletFederationApi;
79use crate::backup::WalletRecovery;
80use crate::client_db::{
81 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
82 PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
83};
84use crate::deposit::DepositStateMachine;
85use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
86
87const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
88
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
90pub struct BitcoinTransactionData {
91 pub btc_transaction: bitcoin::Transaction,
94 pub out_idx: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
99pub enum DepositStateV1 {
100 WaitingForTransaction,
101 WaitingForConfirmation(BitcoinTransactionData),
102 Confirmed(BitcoinTransactionData),
103 Claimed(BitcoinTransactionData),
104 Failed(String),
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
108pub enum DepositStateV2 {
109 WaitingForTransaction,
110 WaitingForConfirmation {
111 #[serde(with = "bitcoin::amount::serde::as_sat")]
112 btc_deposited: bitcoin::Amount,
113 btc_out_point: bitcoin::OutPoint,
114 },
115 Confirmed {
116 #[serde(with = "bitcoin::amount::serde::as_sat")]
117 btc_deposited: bitcoin::Amount,
118 btc_out_point: bitcoin::OutPoint,
119 },
120 Claimed {
121 #[serde(with = "bitcoin::amount::serde::as_sat")]
122 btc_deposited: bitcoin::Amount,
123 btc_out_point: bitcoin::OutPoint,
124 },
125 Failed(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub enum WithdrawState {
130 Created,
131 Succeeded(bitcoin::Txid),
132 Failed(String),
133 }
137
138async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
139where
140 S: Stream<Item = WalletClientStates> + Unpin,
141{
142 loop {
143 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
144 return Some(ds.state);
145 }
146 tokio::task::yield_now().await;
147 }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct WalletClientInit(pub Option<BitcoinRpcConfig>);
153
154impl WalletClientInit {
155 pub fn new(rpc: BitcoinRpcConfig) -> Self {
156 Self(Some(rpc))
157 }
158}
159
160impl ModuleInit for WalletClientInit {
161 type Common = WalletCommonInit;
162
163 async fn dump_database(
164 &self,
165 dbtx: &mut DatabaseTransaction<'_>,
166 prefix_names: Vec<String>,
167 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
168 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
169 BTreeMap::new();
170 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
171 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
172 });
173
174 for table in filtered_prefixes {
175 match table {
176 DbKeyPrefix::NextPegInTweakIndex => {
177 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
178 wallet_client_items
179 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
180 }
181 }
182 DbKeyPrefix::PegInTweakIndex => {
183 push_db_pair_items!(
184 dbtx,
185 PegInTweakIndexPrefix,
186 PegInTweakIndexKey,
187 PegInTweakIndexData,
188 wallet_client_items,
189 "Peg-In Tweak Index"
190 );
191 }
192 DbKeyPrefix::ClaimedPegIn => {
193 push_db_pair_items!(
194 dbtx,
195 ClaimedPegInPrefix,
196 ClaimedPegInKey,
197 ClaimedPegInData,
198 wallet_client_items,
199 "Claimed Peg-In"
200 );
201 }
202 DbKeyPrefix::RecoveryFinalized => {
203 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
204 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
205 }
206 }
207 DbKeyPrefix::SupportsSafeDeposit => {
208 push_db_pair_items!(
209 dbtx,
210 SupportsSafeDepositPrefix,
211 SupportsSafeDepositKey,
212 (),
213 wallet_client_items,
214 "Supports Safe Deposit"
215 );
216 }
217 DbKeyPrefix::RecoveryState
218 | DbKeyPrefix::ExternalReservedStart
219 | DbKeyPrefix::CoreInternalReservedStart
220 | DbKeyPrefix::CoreInternalReservedEnd => {}
221 }
222 }
223
224 Box::new(wallet_client_items.into_iter())
225 }
226}
227
228#[apply(async_trait_maybe_send!)]
229impl ClientModuleInit for WalletClientInit {
230 type Module = WalletClientModule;
231
232 fn supported_api_versions(&self) -> MultiApiVersion {
233 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
234 .expect("no version conflicts")
235 }
236
237 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
238 let data = WalletClientModuleData {
239 cfg: args.cfg().clone(),
240 module_root_secret: args.module_root_secret().clone(),
241 };
242
243 let rpc_config = self
244 .0
245 .clone()
246 .unwrap_or(WalletClientModule::get_rpc_config(args.cfg()));
247
248 let db = args.db().clone();
249
250 let btc_rpc = create_bitcoind(&rpc_config)?;
251 let module_api = args.module_api().clone();
252
253 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
254 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
255
256 Ok(WalletClientModule {
257 db,
258 data,
259 module_api,
260 notifier: args.notifier().clone(),
261 rpc: btc_rpc,
262 client_ctx: args.context(),
263 pegin_monitor_wakeup_sender,
264 pegin_monitor_wakeup_receiver,
265 pegin_claimed_receiver,
266 pegin_claimed_sender,
267 task_group: args.task_group().clone(),
268 admin_auth: args.admin_auth().cloned(),
269 })
270 }
271
272 async fn recover(
280 &self,
281 args: &ClientModuleRecoverArgs<Self>,
282 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
283 ) -> anyhow::Result<()> {
284 args.recover_from_history::<WalletRecovery>(self, snapshot)
285 .await
286 }
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct WalletOperationMeta {
291 pub variant: WalletOperationMetaVariant,
292 pub extra_meta: serde_json::Value,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub enum WalletOperationMetaVariant {
298 Deposit {
299 address: Address<NetworkUnchecked>,
300 #[serde(default)]
305 tweak_idx: Option<TweakIdx>,
306 #[serde(default, skip_serializing_if = "Option::is_none")]
307 expires_at: Option<SystemTime>,
308 },
309 Withdraw {
310 address: Address<NetworkUnchecked>,
311 #[serde(with = "bitcoin::amount::serde::as_sat")]
312 amount: bitcoin::Amount,
313 fee: PegOutFees,
314 change: Vec<OutPoint>,
315 },
316
317 RbfWithdraw {
318 rbf: Rbf,
319 change: Vec<OutPoint>,
320 },
321}
322
323#[derive(Debug, Clone)]
325pub struct WalletClientModuleData {
326 cfg: WalletClientConfig,
327 module_root_secret: DerivableSecret,
328}
329
330impl WalletClientModuleData {
331 fn derive_deposit_address(
332 &self,
333 idx: TweakIdx,
334 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
335 let idx = ChildId(idx.0);
336
337 let secret_tweak_key = self
338 .module_root_secret
339 .child_key(WALLET_TWEAK_CHILD_ID)
340 .child_key(idx)
341 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
342
343 let public_tweak_key = secret_tweak_key.public_key();
344
345 let address = self
346 .cfg
347 .peg_in_descriptor
348 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
349 .address(self.cfg.network.0)
350 .unwrap();
351
352 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
354
355 (secret_tweak_key, public_tweak_key, address, operation_id)
356 }
357
358 fn derive_peg_in_script(
359 &self,
360 idx: TweakIdx,
361 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
362 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
363
364 (
365 self.cfg
366 .peg_in_descriptor
367 .tweak(&secret_tweak_key.public_key(), SECP256K1)
368 .script_pubkey(),
369 address,
370 secret_tweak_key,
371 operation_id,
372 )
373 }
374}
375
376#[derive(Debug)]
377pub struct WalletClientModule {
378 data: WalletClientModuleData,
379 db: Database,
380 module_api: DynModuleApi,
381 notifier: ModuleNotifier<WalletClientStates>,
382 rpc: DynBitcoindRpc,
383 client_ctx: ClientContext<Self>,
384 pegin_monitor_wakeup_sender: watch::Sender<()>,
386 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
387 pegin_claimed_sender: watch::Sender<()>,
389 pegin_claimed_receiver: watch::Receiver<()>,
390 task_group: TaskGroup,
391 admin_auth: Option<ApiAuth>,
392}
393
394#[apply(async_trait_maybe_send!)]
395impl ClientModule for WalletClientModule {
396 type Init = WalletClientInit;
397 type Common = WalletModuleTypes;
398 type Backup = WalletModuleBackup;
399 type ModuleStateMachineContext = WalletClientContext;
400 type States = WalletClientStates;
401
402 fn context(&self) -> Self::ModuleStateMachineContext {
403 WalletClientContext {
404 rpc: self.rpc.clone(),
405 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
406 wallet_decoder: self.decoder(),
407 secp: Secp256k1::default(),
408 client_ctx: self.client_ctx.clone(),
409 }
410 }
411
412 async fn start(&self) {
413 self.task_group.spawn_cancellable("peg-in monitor", {
414 let client_ctx = self.client_ctx.clone();
415 let db = self.db.clone();
416 let btc_rpc = self.rpc.clone();
417 let module_api = self.module_api.clone();
418 let data = self.data.clone();
419 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
420 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
421 pegin_monitor::run_peg_in_monitor(
422 client_ctx,
423 db,
424 btc_rpc,
425 module_api,
426 data,
427 pegin_claimed_sender,
428 pegin_monitor_wakeup_receiver,
429 )
430 });
431
432 self.task_group
433 .spawn_cancellable("supports-safe-deposit-version", {
434 let db = self.db.clone();
435 let module_api = self.module_api.clone();
436
437 poll_supports_safe_deposit_version(db, module_api)
438 });
439 }
440
441 fn supports_backup(&self) -> bool {
442 true
443 }
444
445 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
446 let session_count = self.client_ctx.global_api().session_count().await?;
448
449 let mut dbtx = self.db.begin_transaction_nc().await;
450 let next_pegin_tweak_idx = dbtx
451 .get_value(&NextPegInTweakIndexKey)
452 .await
453 .unwrap_or_default();
454 let claimed = dbtx
455 .find_by_prefix(&PegInTweakIndexPrefix)
456 .await
457 .filter_map(|(k, v)| async move {
458 if v.claimed.is_empty() {
459 None
460 } else {
461 Some(k.0)
462 }
463 })
464 .collect()
465 .await;
466 Ok(backup::WalletModuleBackup::new_v1(
467 session_count,
468 next_pegin_tweak_idx,
469 claimed,
470 ))
471 }
472
473 fn input_fee(
474 &self,
475 _amount: Amount,
476 _input: &<Self::Common as ModuleCommon>::Input,
477 ) -> Option<Amount> {
478 Some(self.cfg().fee_consensus.peg_in_abs)
479 }
480
481 fn output_fee(
482 &self,
483 _amount: Amount,
484 _output: &<Self::Common as ModuleCommon>::Output,
485 ) -> Option<Amount> {
486 Some(self.cfg().fee_consensus.peg_out_abs)
487 }
488
489 #[cfg(feature = "cli")]
490 async fn handle_cli_command(
491 &self,
492 args: &[std::ffi::OsString],
493 ) -> anyhow::Result<serde_json::Value> {
494 cli::handle_cli_command(self, args).await
495 }
496}
497
498#[derive(Debug, Clone)]
499pub struct WalletClientContext {
500 rpc: DynBitcoindRpc,
501 wallet_descriptor: PegInDescriptor,
502 wallet_decoder: Decoder,
503 secp: Secp256k1<All>,
504 pub client_ctx: ClientContext<WalletClientModule>,
505}
506
507impl Context for WalletClientContext {
508 const KIND: Option<ModuleKind> = Some(KIND);
509}
510
511impl WalletClientModule {
512 fn cfg(&self) -> &WalletClientConfig {
513 &self.data.cfg
514 }
515
516 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
517 if let Ok(rpc_config) = BitcoinRpcConfig::get_defaults_from_env_vars() {
518 if rpc_config.kind == "bitcoind" {
521 cfg.default_bitcoin_rpc.clone()
522 } else {
523 rpc_config
524 }
525 } else {
526 cfg.default_bitcoin_rpc.clone()
527 }
528 }
529
530 pub fn get_network(&self) -> Network {
531 self.cfg().network.0
532 }
533
534 pub fn get_fee_consensus(&self) -> FeeConsensus {
535 self.cfg().fee_consensus
536 }
537
538 async fn allocate_deposit_address_inner(
539 &self,
540 dbtx: &mut DatabaseTransaction<'_>,
541 ) -> (OperationId, Address, TweakIdx) {
542 dbtx.ensure_isolated().expect("Must be isolated db");
543
544 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
545 let (_secret_tweak_key, _, address, operation_id) =
546 self.data.derive_deposit_address(tweak_idx);
547
548 let now = fedimint_core::time::now();
549
550 dbtx.insert_new_entry(
551 &PegInTweakIndexKey(tweak_idx),
552 &PegInTweakIndexData {
553 creation_time: now,
554 next_check_time: Some(now),
555 last_check_time: None,
556 operation_id,
557 claimed: vec![],
558 },
559 )
560 .await;
561
562 (operation_id, address, tweak_idx)
563 }
564
565 pub async fn get_withdraw_fees(
572 &self,
573 address: &bitcoin::Address,
574 amount: bitcoin::Amount,
575 ) -> anyhow::Result<PegOutFees> {
576 self.module_api
577 .fetch_peg_out_fees(address, amount)
578 .await?
579 .context("Federation didn't return peg-out fees")
580 }
581
582 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
584 Ok(self.module_api.fetch_wallet_summary().await?)
585 }
586
587 pub fn create_withdraw_output(
588 &self,
589 operation_id: OperationId,
590 address: bitcoin::Address,
591 amount: bitcoin::Amount,
592 fees: PegOutFees,
593 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
594 let output = WalletOutput::new_v0_peg_out(address, amount, fees);
595
596 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
597
598 let sm_gen = move |out_point_range: OutPointRange| {
599 assert_eq!(out_point_range.count(), 1);
600 let out_idx = out_point_range.start_idx();
601 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
602 operation_id,
603 state: WithdrawStates::Created(CreatedWithdrawState {
604 fm_outpoint: OutPoint {
605 txid: out_point_range.txid(),
606 out_idx,
607 },
608 }),
609 })]
610 };
611
612 Ok(ClientOutputBundle::new(
613 vec![ClientOutput::<WalletOutput> { output, amount }],
614 vec![ClientOutputSM::<WalletClientStates> {
615 state_machines: Arc::new(sm_gen),
616 }],
617 ))
618 }
619
620 pub fn create_rbf_withdraw_output(
621 &self,
622 operation_id: OperationId,
623 rbf: &Rbf,
624 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
625 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
626
627 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
628
629 let sm_gen = move |out_point_range: OutPointRange| {
630 assert_eq!(out_point_range.count(), 1);
631 let out_idx = out_point_range.start_idx();
632 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
633 operation_id,
634 state: WithdrawStates::Created(CreatedWithdrawState {
635 fm_outpoint: OutPoint {
636 txid: out_point_range.txid(),
637 out_idx,
638 },
639 }),
640 })]
641 };
642
643 Ok(ClientOutputBundle::new(
644 vec![ClientOutput::<WalletOutput> { output, amount }],
645 vec![ClientOutputSM::<WalletClientStates> {
646 state_machines: Arc::new(sm_gen),
647 }],
648 ))
649 }
650
651 pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
652 Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
653 }
654
655 pub async fn supports_safe_deposit(&self) -> bool {
664 let mut dbtx = self.db.begin_transaction().await;
665
666 let already_verified_supports_safe_deposit =
667 dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
668
669 already_verified_supports_safe_deposit || {
670 match self.module_api.module_consensus_version().await {
671 Ok(module_consensus_version) => {
672 let supported_version =
673 SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
674
675 if supported_version {
676 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
677 dbtx.commit_tx().await;
678 }
679
680 supported_version
681 }
682 Err(_) => false,
683 }
684 }
685 }
686
687 pub async fn safe_allocate_deposit_address<M>(
695 &self,
696 extra_meta: M,
697 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
698 where
699 M: Serialize + MaybeSend + MaybeSync,
700 {
701 ensure!(
702 self.supports_safe_deposit().await,
703 "Wallet module consensus version doesn't support safe deposits",
704 );
705
706 self.allocate_deposit_address_expert_only(extra_meta).await
707 }
708
709 pub async fn allocate_deposit_address_expert_only<M>(
727 &self,
728 extra_meta: M,
729 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
730 where
731 M: Serialize + MaybeSend + MaybeSync,
732 {
733 let extra_meta_value =
734 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
735 let (operation_id, address, tweak_idx) = self
736 .db
737 .autocommit(
738 move |dbtx, _| {
739 let extra_meta_value_inner = extra_meta_value.clone();
740 Box::pin(async move {
741 let (operation_id, address, tweak_idx) = self
742 .allocate_deposit_address_inner(dbtx)
743 .await;
744
745 self.client_ctx.manual_operation_start_dbtx(
746 dbtx,
747 operation_id,
748 WalletCommonInit::KIND.as_str(),
749 WalletOperationMeta {
750 variant: WalletOperationMetaVariant::Deposit {
751 address: address.clone().into_unchecked(),
752 tweak_idx: Some(tweak_idx),
753 expires_at: None,
754 },
755 extra_meta: extra_meta_value_inner,
756 },
757 vec![]
758 ).await?;
759
760 debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
761
762 self.rpc
764 .watch_script_history(&address.script_pubkey())
765 .await?;
766
767 let sender = self.pegin_monitor_wakeup_sender.clone();
768 dbtx.on_commit(move || {
769 let _ = sender.send(());
770 });
771
772 Ok((operation_id, address, tweak_idx))
773 })
774 },
775 Some(100),
776 )
777 .await
778 .map_err(|e| match e {
779 AutocommitError::CommitFailed {
780 last_error,
781 attempts,
782 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
783 AutocommitError::ClosureError { error, .. } => error,
784 })?;
785
786 Ok((operation_id, address, tweak_idx))
787 }
788
789 pub async fn subscribe_deposit(
795 &self,
796 operation_id: OperationId,
797 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
798 let operation = self
799 .client_ctx
800 .get_operation(operation_id)
801 .await
802 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
803
804 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
805 bail!("Operation is not a wallet operation");
806 }
807
808 let operation_meta = operation.meta::<WalletOperationMeta>();
809
810 let WalletOperationMetaVariant::Deposit {
811 address, tweak_idx, ..
812 } = operation_meta.variant
813 else {
814 bail!("Operation is not a deposit operation");
815 };
816
817 let address = address.require_network(self.cfg().network.0)?;
818
819 let Some(tweak_idx) = tweak_idx else {
821 let outcome_v1 = operation
825 .outcome::<DepositStateV1>()
826 .context("Old pending deposit, can't subscribe to updates")?;
827
828 let outcome_v2 = match outcome_v1 {
829 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
830 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
831 btc_out_point: bitcoin::OutPoint {
832 txid: tx_info.btc_transaction.compute_txid(),
833 vout: tx_info.out_idx,
834 },
835 },
836 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
837 _ => bail!("Non-final outcome in operation log"),
838 };
839
840 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
841 };
842
843 Ok(self.client_ctx.outcome_or_updates(&operation, operation_id, || {
844 let stream_rpc = self.rpc.clone();
845 let stream_client_ctx = self.client_ctx.clone();
846 let stream_script_pub_key = address.script_pubkey();
847
848 stream! {
849 yield DepositStateV2::WaitingForTransaction;
850
851 retry(
852 "subscribe script history",
853 background_backoff(),
854 || stream_rpc.watch_script_history(&stream_script_pub_key)
855 ).await.expect("Will never give up");
856 let (btc_out_point, btc_deposited) = retry(
857 "fetch history",
858 background_backoff(),
859 || async {
860 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
861 history.first().and_then(|tx| {
862 let (out_idx, amount) = tx.output
863 .iter()
864 .enumerate()
865 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
866 let txid = tx.compute_txid();
867
868 Some((
869 bitcoin::OutPoint {
870 txid,
871 vout: out_idx as u32,
872 },
873 amount
874 ))
875 }).context("No deposit transaction found")
876 }
877 ).await.expect("Will never give up");
878
879 yield DepositStateV2::WaitingForConfirmation {
880 btc_deposited,
881 btc_out_point
882 };
883
884 let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
885 peg_in_index: tweak_idx,
886 btc_out_point,
887 }).await;
888
889 yield DepositStateV2::Confirmed {
890 btc_deposited,
891 btc_out_point
892 };
893
894 match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
895 Ok(()) => yield DepositStateV2::Claimed {
896 btc_deposited,
897 btc_out_point
898 },
899 Err(e) => yield DepositStateV2::Failed(e.to_string())
900 }
901 }
902 }))
903 }
904
905 pub async fn find_tweak_idx_by_address(
906 &self,
907 address: bitcoin::Address<NetworkUnchecked>,
908 ) -> anyhow::Result<TweakIdx> {
909 let data = self.data.clone();
910 let Some((tweak_idx, _)) = self
911 .db
912 .begin_transaction_nc()
913 .await
914 .find_by_prefix(&PegInTweakIndexPrefix)
915 .await
916 .filter(|(k, _)| {
917 let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
918 future::ready(derived_address.into_unchecked() == address)
919 })
920 .next()
921 .await
922 else {
923 bail!("Address not found in the list of derived keys");
924 };
925
926 Ok(tweak_idx.0)
927 }
928 pub async fn find_tweak_idx_by_operation_id(
929 &self,
930 operation_id: OperationId,
931 ) -> anyhow::Result<TweakIdx> {
932 Ok(self
933 .client_ctx
934 .module_db()
935 .clone()
936 .begin_transaction_nc()
937 .await
938 .find_by_prefix(&PegInTweakIndexPrefix)
939 .await
940 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
941 .next()
942 .await
943 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
944 .0
945 .0)
946 }
947
948 pub async fn get_pegin_tweak_idx(
949 &self,
950 tweak_idx: TweakIdx,
951 ) -> anyhow::Result<PegInTweakIndexData> {
952 self.client_ctx
953 .module_db()
954 .clone()
955 .begin_transaction_nc()
956 .await
957 .get_value(&PegInTweakIndexKey(tweak_idx))
958 .await
959 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
960 }
961
962 pub async fn get_claimed_pegins(
963 &self,
964 dbtx: &mut DatabaseTransaction<'_>,
965 tweak_idx: TweakIdx,
966 ) -> Vec<(
967 bitcoin::OutPoint,
968 TransactionId,
969 Vec<fedimint_core::OutPoint>,
970 )> {
971 let outpoints = dbtx
972 .get_value(&PegInTweakIndexKey(tweak_idx))
973 .await
974 .map(|v| v.claimed)
975 .unwrap_or_default();
976
977 let mut res = vec![];
978
979 for outpoint in outpoints {
980 let claimed_peg_in_data = dbtx
981 .get_value(&ClaimedPegInKey {
982 peg_in_index: tweak_idx,
983 btc_out_point: outpoint,
984 })
985 .await
986 .expect("Must have a corresponding claim record");
987 res.push((
988 outpoint,
989 claimed_peg_in_data.claim_txid,
990 claimed_peg_in_data.change,
991 ));
992 }
993
994 res
995 }
996
997 pub async fn recheck_pegin_address_by_op_id(
999 &self,
1000 operation_id: OperationId,
1001 ) -> anyhow::Result<()> {
1002 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1003
1004 self.recheck_pegin_address(tweak_idx).await
1005 }
1006
1007 pub async fn recheck_pegin_address_by_address(
1009 &self,
1010 address: bitcoin::Address<NetworkUnchecked>,
1011 ) -> anyhow::Result<()> {
1012 self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1013 .await
1014 }
1015
1016 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1018 self.db
1019 .autocommit(
1020 |dbtx, _| {
1021 Box::pin(async {
1022 let db_key = PegInTweakIndexKey(tweak_idx);
1023 let db_val = dbtx
1024 .get_value(&db_key)
1025 .await
1026 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1027
1028 dbtx.insert_entry(
1029 &db_key,
1030 &PegInTweakIndexData {
1031 next_check_time: Some(fedimint_core::time::now()),
1032 ..db_val
1033 },
1034 )
1035 .await;
1036
1037 let sender = self.pegin_monitor_wakeup_sender.clone();
1038 dbtx.on_commit(move || {
1039 let _ = sender.send(());
1040 });
1041
1042 Ok::<_, anyhow::Error>(())
1043 })
1044 },
1045 Some(100),
1046 )
1047 .await?;
1048
1049 Ok(())
1050 }
1051
1052 pub async fn await_num_deposits_by_operation_id(
1054 &self,
1055 operation_id: OperationId,
1056 num_deposits: usize,
1057 ) -> anyhow::Result<()> {
1058 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1059 self.await_num_deposits(tweak_idx, num_deposits).await
1060 }
1061
1062 pub async fn await_num_deposits_by_address(
1063 &self,
1064 address: bitcoin::Address<NetworkUnchecked>,
1065 num_deposits: usize,
1066 ) -> anyhow::Result<()> {
1067 self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1068 .await
1069 }
1070
1071 #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1072 pub async fn await_num_deposits(
1073 &self,
1074 tweak_idx: TweakIdx,
1075 num_deposits: usize,
1076 ) -> anyhow::Result<()> {
1077 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1078
1079 let mut receiver = self.pegin_claimed_receiver.clone();
1080 let mut backoff = backoff_util::aggressive_backoff();
1081
1082 loop {
1083 let pegins = self
1084 .get_claimed_pegins(
1085 &mut self.client_ctx.module_db().begin_transaction_nc().await,
1086 tweak_idx,
1087 )
1088 .await;
1089
1090 if pegins.len() < num_deposits {
1091 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1092 self.recheck_pegin_address(tweak_idx).await?;
1093 runtime::sleep(backoff.next().unwrap_or_default()).await;
1094 receiver.changed().await?;
1095 continue;
1096 }
1097
1098 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1099
1100 for (_outpoint, transaction_id, change) in pegins {
1101 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1102 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1103
1104 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1105 bail!("{}", e);
1106 }
1107
1108 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1109 self.client_ctx
1110 .await_primary_module_outputs(operation_id, change)
1111 .await
1112 .expect("Cannot fail if tx was accepted and federation is honest");
1113 }
1114
1115 return Ok(());
1116 }
1117 }
1118
1119 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1124 &self,
1125 address: &bitcoin::Address,
1126 amount: bitcoin::Amount,
1127 fee: PegOutFees,
1128 extra_meta: M,
1129 ) -> anyhow::Result<OperationId> {
1130 {
1131 let operation_id = OperationId(thread_rng().gen());
1132
1133 let withdraw_output =
1134 self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1135 let tx_builder = TransactionBuilder::new()
1136 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1137
1138 let extra_meta =
1139 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1140 self.client_ctx
1141 .finalize_and_submit_transaction(
1142 operation_id,
1143 WalletCommonInit::KIND.as_str(),
1144 {
1145 let address = address.clone();
1146 move |change_range: OutPointRange| WalletOperationMeta {
1147 variant: WalletOperationMetaVariant::Withdraw {
1148 address: address.clone().into_unchecked(),
1149 amount,
1150 fee,
1151 change: change_range.into_iter().collect(),
1152 },
1153 extra_meta: extra_meta.clone(),
1154 }
1155 },
1156 tx_builder,
1157 )
1158 .await?;
1159
1160 Ok(operation_id)
1161 }
1162 }
1163
1164 #[deprecated(
1169 since = "0.4.0",
1170 note = "RBF withdrawals are rejected by the federation"
1171 )]
1172 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1173 &self,
1174 rbf: Rbf,
1175 extra_meta: M,
1176 ) -> anyhow::Result<OperationId> {
1177 let operation_id = OperationId(thread_rng().gen());
1178
1179 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1180 let tx_builder = TransactionBuilder::new()
1181 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1182
1183 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1184 self.client_ctx
1185 .finalize_and_submit_transaction(
1186 operation_id,
1187 WalletCommonInit::KIND.as_str(),
1188 move |change_range: OutPointRange| WalletOperationMeta {
1189 variant: WalletOperationMetaVariant::RbfWithdraw {
1190 rbf: rbf.clone(),
1191 change: change_range.into_iter().collect(),
1192 },
1193 extra_meta: extra_meta.clone(),
1194 },
1195 tx_builder,
1196 )
1197 .await?;
1198
1199 Ok(operation_id)
1200 }
1201
1202 pub async fn subscribe_withdraw_updates(
1203 &self,
1204 operation_id: OperationId,
1205 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1206 let operation = self
1207 .client_ctx
1208 .get_operation(operation_id)
1209 .await
1210 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1211
1212 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1213 bail!("Operation is not a wallet operation");
1214 }
1215
1216 let operation_meta = operation.meta::<WalletOperationMeta>();
1217
1218 let (WalletOperationMetaVariant::Withdraw { change, .. }
1219 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1220 else {
1221 bail!("Operation is not a withdraw operation");
1222 };
1223
1224 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1225 let client_ctx = self.client_ctx.clone();
1226
1227 Ok(self
1228 .client_ctx
1229 .outcome_or_updates(&operation, operation_id, || {
1230 stream! {
1231 match next_withdraw_state(&mut operation_stream).await {
1232 Some(WithdrawStates::Created(_)) => {
1233 yield WithdrawState::Created;
1234 },
1235 Some(s) => {
1236 panic!("Unexpected state {s:?}")
1237 },
1238 None => return,
1239 }
1240
1241 let _ = client_ctx
1246 .await_primary_module_outputs(operation_id, change)
1247 .await;
1248
1249
1250 match next_withdraw_state(&mut operation_stream).await {
1251 Some(WithdrawStates::Aborted(inner)) => {
1252 yield WithdrawState::Failed(inner.error);
1253 },
1254 Some(WithdrawStates::Success(inner)) => {
1255 yield WithdrawState::Succeeded(inner.txid);
1256 },
1257 Some(s) => {
1258 panic!("Unexpected state {s:?}")
1259 },
1260 None => {},
1261 }
1262 }
1263 }))
1264 }
1265
1266 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1267 self.admin_auth
1268 .clone()
1269 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1270 }
1271
1272 pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1273 self.module_api
1274 .activate_consensus_version_voting(self.admin_auth()?)
1275 .await?;
1276
1277 Ok(())
1278 }
1279}
1280
1281async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1284 loop {
1285 let mut dbtx = db.begin_transaction().await;
1286
1287 if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1288 break;
1289 }
1290
1291 if let Ok(module_consensus_version) = module_api.module_consensus_version().await {
1292 if SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version {
1293 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1294 dbtx.commit_tx().await;
1295 break;
1296 }
1297 }
1298
1299 drop(dbtx);
1300
1301 if is_running_in_test_env() {
1302 sleep(Duration::from_secs(1)).await;
1303 } else {
1304 sleep(Duration::from_secs(3600)).await;
1305 }
1306 }
1307}
1308
1309async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1311 let index = dbtx
1312 .get_value(&NextPegInTweakIndexKey)
1313 .await
1314 .unwrap_or_default();
1315 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1316 .await;
1317 index
1318}
1319
1320#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1321pub enum WalletClientStates {
1322 Deposit(DepositStateMachine),
1323 Withdraw(WithdrawStateMachine),
1324}
1325
1326impl IntoDynInstance for WalletClientStates {
1327 type DynType = DynState;
1328
1329 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1330 DynState::from_typed(instance_id, self)
1331 }
1332}
1333
1334impl State for WalletClientStates {
1335 type ModuleContext = WalletClientContext;
1336
1337 fn transitions(
1338 &self,
1339 context: &Self::ModuleContext,
1340 global_context: &DynGlobalClientContext,
1341 ) -> Vec<StateTransition<Self>> {
1342 match self {
1343 WalletClientStates::Deposit(sm) => {
1344 sm_enum_variant_translation!(
1345 sm.transitions(context, global_context),
1346 WalletClientStates::Deposit
1347 )
1348 }
1349 WalletClientStates::Withdraw(sm) => {
1350 sm_enum_variant_translation!(
1351 sm.transitions(context, global_context),
1352 WalletClientStates::Withdraw
1353 )
1354 }
1355 }
1356 }
1357
1358 fn operation_id(&self) -> OperationId {
1359 match self {
1360 WalletClientStates::Deposit(sm) => sm.operation_id(),
1361 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1362 }
1363 }
1364}
1365
1366#[cfg(all(test, not(target_family = "wasm")))]
1367mod tests {
1368 use std::collections::BTreeSet;
1369 use std::sync::atomic::{AtomicBool, Ordering};
1370
1371 use super::*;
1372 use crate::backup::{
1373 recover_scan_idxes_for_activity, RecoverScanOutcome, RECOVER_NUM_IDX_ADD_TO_LAST_USED,
1374 };
1375
1376 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1378 async fn sanity_test_recover_inner() {
1379 {
1380 let last_checked = AtomicBool::new(false);
1381 let last_checked = &last_checked;
1382 assert_eq!(
1383 recover_scan_idxes_for_activity(
1384 TweakIdx(0),
1385 &BTreeSet::new(),
1386 |cur_idx| async move {
1387 Ok(match cur_idx {
1388 TweakIdx(9) => {
1389 last_checked.store(true, Ordering::SeqCst);
1390 vec![]
1391 }
1392 TweakIdx(10) => panic!("Shouldn't happen"),
1393 TweakIdx(11) => {
1394 vec![0usize] }
1396 _ => vec![],
1397 })
1398 }
1399 )
1400 .await
1401 .unwrap(),
1402 RecoverScanOutcome {
1403 last_used_idx: None,
1404 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1405 tweak_idxes_with_pegins: BTreeSet::from([])
1406 }
1407 );
1408 assert!(last_checked.load(Ordering::SeqCst));
1409 }
1410
1411 {
1412 let last_checked = AtomicBool::new(false);
1413 let last_checked = &last_checked;
1414 assert_eq!(
1415 recover_scan_idxes_for_activity(
1416 TweakIdx(0),
1417 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1418 |cur_idx| async move {
1419 Ok(match cur_idx {
1420 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1421 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1422 TweakIdx(11) => {
1423 last_checked.store(true, Ordering::SeqCst);
1424 vec![]
1425 }
1426 TweakIdx(12) => panic!("Shouldn't happen"),
1427 TweakIdx(13) => {
1428 vec![0usize] }
1430 _ => vec![],
1431 })
1432 }
1433 )
1434 .await
1435 .unwrap(),
1436 RecoverScanOutcome {
1437 last_used_idx: Some(TweakIdx(2)),
1438 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1439 tweak_idxes_with_pegins: BTreeSet::from([])
1440 }
1441 );
1442 assert!(last_checked.load(Ordering::SeqCst));
1443 }
1444
1445 {
1446 let last_checked = AtomicBool::new(false);
1447 let last_checked = &last_checked;
1448 assert_eq!(
1449 recover_scan_idxes_for_activity(
1450 TweakIdx(10),
1451 &BTreeSet::new(),
1452 |cur_idx| async move {
1453 Ok(match cur_idx {
1454 TweakIdx(10) => vec![()],
1455 TweakIdx(19) => {
1456 last_checked.store(true, Ordering::SeqCst);
1457 vec![]
1458 }
1459 TweakIdx(20) => panic!("Shouldn't happen"),
1460 _ => vec![],
1461 })
1462 }
1463 )
1464 .await
1465 .unwrap(),
1466 RecoverScanOutcome {
1467 last_used_idx: Some(TweakIdx(10)),
1468 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1469 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1470 }
1471 );
1472 assert!(last_checked.load(Ordering::SeqCst));
1473 }
1474
1475 assert_eq!(
1476 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1477 Ok(match cur_idx {
1478 TweakIdx(6 | 15) => vec![()],
1479 _ => vec![],
1480 })
1481 })
1482 .await
1483 .unwrap(),
1484 RecoverScanOutcome {
1485 last_used_idx: Some(TweakIdx(15)),
1486 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1487 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1488 }
1489 );
1490 assert_eq!(
1491 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1492 Ok(match cur_idx {
1493 TweakIdx(8) => {
1494 vec![()] }
1496 TweakIdx(9) => {
1497 panic!("Shouldn't happen")
1498 }
1499 _ => vec![],
1500 })
1501 })
1502 .await
1503 .unwrap(),
1504 RecoverScanOutcome {
1505 last_used_idx: None,
1506 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1507 tweak_idxes_with_pegins: BTreeSet::from([])
1508 }
1509 );
1510 assert_eq!(
1511 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1512 Ok(match cur_idx {
1513 TweakIdx(9) => panic!("Shouldn't happen"),
1514 TweakIdx(15) => vec![()],
1515 _ => vec![],
1516 })
1517 })
1518 .await
1519 .unwrap(),
1520 RecoverScanOutcome {
1521 last_used_idx: Some(TweakIdx(15)),
1522 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1523 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1524 }
1525 );
1526 }
1527}