#![warn(clippy::pedantic)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::default_trait_access)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::missing_panics_doc)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::must_use_candidate)]
pub mod api;
#[cfg(feature = "cli")]
mod cli;
mod backup;
pub mod client_db;
mod deposit;
mod pegin_monitor;
mod withdraw;
use std::collections::{BTreeMap, BTreeSet};
use std::future;
use std::sync::Arc;
use std::time::SystemTime;
use anyhow::{anyhow, bail, ensure, Context as AnyhowContext};
use async_stream::stream;
use backup::WalletModuleBackup;
use bitcoin::address::NetworkUnchecked;
use bitcoin::{Address, Network, ScriptBuf};
use client_db::{DbKeyPrefix, PegInTweakIndexKey, TweakIdx};
use fedimint_api_client::api::DynModuleApi;
use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
use fedimint_client::derivable_secret::{ChildId, DerivableSecret};
use fedimint_client::module::init::{
ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
};
use fedimint_client::module::recovery::RecoveryProgress;
use fedimint_client::module::{ClientContext, ClientModule, IClientModule};
use fedimint_client::oplog::UpdateStreamOrOutcome;
use fedimint_client::sm::util::MapStateTransitions;
use fedimint_client::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
use fedimint_client::transaction::{ClientOutput, TransactionBuilder};
use fedimint_client::{sm_enum_variant_translation, DynGlobalClientContext};
use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, OperationId};
use fedimint_core::db::{
AutocommitError, Database, DatabaseTransaction, DatabaseVersion,
IDatabaseTransactionOpsCoreTyped,
};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::envs::BitcoinRpcConfig;
use fedimint_core::module::{
ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
};
use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup};
use fedimint_core::{
apply, async_trait_maybe_send, push_db_pair_items, Amount, OutPoint, TransactionId,
};
use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
use fedimint_wallet_common::tweakable::Tweakable;
pub use fedimint_wallet_common::*;
use futures::{Future, Stream, StreamExt};
use rand::{thread_rng, Rng};
use secp256k1::{All, KeyPair, Secp256k1, SECP256K1};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio::sync::watch;
use tracing::{debug, instrument};
use crate::api::WalletFederationApi;
use crate::client_db::{
ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey,
};
use crate::deposit::DepositStateMachine;
use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct BitcoinTransactionData {
pub btc_transaction: bitcoin::Transaction,
pub out_idx: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum DepositState {
WaitingForTransaction,
WaitingForConfirmation(BitcoinTransactionData),
Confirmed(BitcoinTransactionData),
Claimed(BitcoinTransactionData),
Failed(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum WithdrawState {
Created,
Succeeded(bitcoin::Txid),
Failed(String),
}
async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
where
S: Stream<Item = WalletClientStates> + Unpin,
{
loop {
if let WalletClientStates::Withdraw(ds) = stream.next().await? {
return Some(ds.state);
}
tokio::task::yield_now().await;
}
}
#[derive(Debug, Clone, Default)]
pub struct WalletClientInit(pub Option<BitcoinRpcConfig>);
impl WalletClientInit {
pub fn new(rpc: BitcoinRpcConfig) -> Self {
Self(Some(rpc))
}
}
impl ModuleInit for WalletClientInit {
type Common = WalletCommonInit;
const DATABASE_VERSION: DatabaseVersion = DatabaseVersion(0);
async fn dump_database(
&self,
dbtx: &mut DatabaseTransaction<'_>,
prefix_names: Vec<String>,
) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
BTreeMap::new();
let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
});
for table in filtered_prefixes {
match table {
DbKeyPrefix::NextPegInTweakIndex => {
if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
wallet_client_items
.insert("NextPegInTweakIndex".to_string(), Box::new(index));
}
}
DbKeyPrefix::PegInTweakIndex => {
push_db_pair_items!(
dbtx,
PegInTweakIndexPrefix,
PegInTweakIndexKey,
PegInTweakIndexData,
wallet_client_items,
"Peg-In Tweak Index"
);
}
DbKeyPrefix::ClaimedPegIn => {
push_db_pair_items!(
dbtx,
ClaimedPegInPrefix,
ClaimedPegInKey,
ClaimedPegInData,
wallet_client_items,
"Claimed Peg-In"
);
}
DbKeyPrefix::RecoveryFinalized => {}
}
}
Box::new(wallet_client_items.into_iter())
}
}
#[apply(async_trait_maybe_send!)]
impl ClientModuleInit for WalletClientInit {
type Module = WalletClientModule;
fn supported_api_versions(&self) -> MultiApiVersion {
MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
.expect("no version conflicts")
}
async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
let data = WalletClientModuleData {
cfg: args.cfg().clone(),
module_root_secret: args.module_root_secret().clone(),
};
let rpc_config = self
.0
.clone()
.unwrap_or(WalletClientModule::get_rpc_config(args.cfg()));
let db = args.db().clone();
let btc_rpc = create_bitcoind(&rpc_config, TaskGroup::new().make_handle())?;
let module_api = args.module_api().clone();
let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
Ok(WalletClientModule {
db,
data,
module_api,
notifier: args.notifier().clone(),
rpc: btc_rpc,
client_ctx: args.context(),
pegin_monitor_wakeup_sender,
pegin_monitor_wakeup_receiver,
pegin_claimed_receiver,
pegin_claimed_sender,
task_group: args.task_group().clone(),
})
}
async fn recover(
&self,
args: &ClientModuleRecoverArgs<Self>,
snapshot: Option<&<Self::Module as ClientModule>::Backup>,
) -> anyhow::Result<()> {
let db = args.db().clone();
if db
.begin_transaction_nc()
.await
.get_value(&RecoveryFinalizedKey)
.await
.unwrap_or_default()
{
debug!(target: LOG_CLIENT_MODULE_WALLET, max_gap=RECOVER_MAX_GAP, "Recovery already complete before");
return Ok(());
}
#[allow(clippy::single_match_else)]
let previous_next_unused_idx = match snapshot {
Some(WalletModuleBackup::V0(backup)) => {
debug!(target: LOG_CLIENT_MODULE_WALLET, "Restoring starting from an existing backup");
backup.next_tweak_idx
}
_ => {
debug!(target: LOG_CLIENT_MODULE_WALLET, "Restoring without an existing backup");
TweakIdx(0)
}
};
let rpc_config = self
.0
.clone()
.unwrap_or(WalletClientModule::get_rpc_config(args.cfg()));
let btc_rpc = create_bitcoind(&rpc_config, TaskGroup::new().make_handle())?;
let btc_rpc = &btc_rpc;
let data = WalletClientModuleData {
cfg: args.cfg().clone(),
module_root_secret: args.module_root_secret().clone(),
};
let data = &data;
let RecoverScanOutcome { last_used_idx, new_start_idx, tweak_idxes_with_pegins} = recover_scan_idxes_for_activity(previous_next_unused_idx, move |cur_tweak_idx: TweakIdx| async move {
args.update_recovery_progress(RecoveryProgress {
complete: u32::try_from(cur_tweak_idx.0).unwrap_or(u32::MAX),
total: u32::try_from(cur_tweak_idx.0.saturating_add(RECOVER_MAX_GAP)).unwrap_or(u32::MAX),
});
let (script, address, _tweak_key, _operation_id) =
data.derive_peg_in_script(cur_tweak_idx);
btc_rpc.watch_script_history(&script).await?;
let history = btc_rpc.get_script_history(&script).await?;
debug!(target: LOG_CLIENT_MODULE_WALLET, %cur_tweak_idx, %address, history_len=history.len(), "Checked address");
Ok(history)
}).await?;
let now = fedimint_core::time::now();
let mut dbtx = db.begin_transaction().await;
let mut tweak_idx = TweakIdx(0);
while tweak_idx < new_start_idx {
let (_script, _address, _tweak_key, operation_id) =
data.derive_peg_in_script(tweak_idx);
dbtx.insert_new_entry(
&PegInTweakIndexKey(tweak_idx),
&PegInTweakIndexData {
creation_time: now,
next_check_time: if tweak_idxes_with_pegins.contains(&tweak_idx) {
Some(now)
} else {
None
},
last_check_time: None,
operation_id,
claimed: vec![],
},
)
.await;
tweak_idx = tweak_idx.next();
}
dbtx.insert_new_entry(&NextPegInTweakIndexKey, &new_start_idx)
.await;
dbtx.insert_entry(&RecoveryFinalizedKey, &true).await;
dbtx.commit_tx().await;
debug!(target: LOG_CLIENT_MODULE_WALLET, %new_start_idx, ?last_used_idx, "Recovery complete");
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalletOperationMeta {
pub variant: WalletOperationMetaVariant,
pub extra_meta: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WalletOperationMetaVariant {
Deposit {
address: bitcoin::Address<NetworkUnchecked>,
expires_at: SystemTime,
},
Withdraw {
address: bitcoin::Address<NetworkUnchecked>,
#[serde(with = "bitcoin::amount::serde::as_sat")]
amount: bitcoin::Amount,
fee: PegOutFees,
change: Vec<OutPoint>,
},
RbfWithdraw {
rbf: Rbf,
change: Vec<OutPoint>,
},
}
#[derive(Debug, Clone)]
pub struct WalletClientModuleData {
cfg: WalletClientConfig,
module_root_secret: DerivableSecret,
}
impl WalletClientModuleData {
fn derive_deposit_address(
&self,
idx: TweakIdx,
) -> (
secp256k1::KeyPair,
secp256k1::PublicKey,
Address,
OperationId,
) {
let idx = ChildId(idx.0);
let secret_tweak_key = self
.module_root_secret
.child_key(WALLET_TWEAK_CHILD_ID)
.child_key(idx)
.to_secp_key(secp256k1::SECP256K1);
let public_tweak_key = secret_tweak_key.public_key();
let address = self
.cfg
.peg_in_descriptor
.tweak(&public_tweak_key, secp256k1::SECP256K1)
.address(self.cfg.network)
.unwrap();
let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
(secret_tweak_key, public_tweak_key, address, operation_id)
}
fn derive_peg_in_script(
&self,
idx: TweakIdx,
) -> (ScriptBuf, bitcoin::Address, KeyPair, OperationId) {
let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
(
self.cfg
.peg_in_descriptor
.tweak(&secret_tweak_key.public_key(), SECP256K1)
.script_pubkey(),
address,
secret_tweak_key,
operation_id,
)
}
}
#[derive(Debug)]
pub struct WalletClientModule {
data: WalletClientModuleData,
db: Database,
module_api: DynModuleApi,
notifier: ModuleNotifier<WalletClientStates>,
rpc: DynBitcoindRpc,
client_ctx: ClientContext<Self>,
pegin_monitor_wakeup_sender: watch::Sender<()>,
pegin_monitor_wakeup_receiver: watch::Receiver<()>,
pegin_claimed_sender: watch::Sender<()>,
pegin_claimed_receiver: watch::Receiver<()>,
task_group: TaskGroup,
}
#[apply(async_trait_maybe_send!)]
impl ClientModule for WalletClientModule {
type Init = WalletClientInit;
type Common = WalletModuleTypes;
type Backup = WalletModuleBackup;
type ModuleStateMachineContext = WalletClientContext;
type States = WalletClientStates;
fn context(&self) -> Self::ModuleStateMachineContext {
WalletClientContext {
rpc: self.rpc.clone(),
wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
wallet_decoder: self.decoder(),
secp: Default::default(),
}
}
async fn start(&self) {
self.task_group.spawn_cancellable("peg-in monitor", {
let client_ctx = self.client_ctx.clone();
let db = self.db.clone();
let btc_rpc = self.rpc.clone();
let module_api = self.module_api.clone();
let data = self.data.clone();
let pegin_claimed_sender = self.pegin_claimed_sender.clone();
let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
pegin_monitor::run_peg_in_monitor(
client_ctx,
db,
btc_rpc,
module_api,
data,
pegin_claimed_sender,
pegin_monitor_wakeup_receiver,
)
});
}
fn supports_backup(&self) -> bool {
true
}
async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
let session_count = self.client_ctx.global_api().session_count().await?;
Ok(backup::WalletModuleBackup::new_v0(
session_count,
self.db
.begin_transaction_nc()
.await
.get_value(&NextPegInTweakIndexKey)
.await
.unwrap_or_default(),
))
}
fn input_fee(&self, _input: &<Self::Common as ModuleCommon>::Input) -> Option<Amount> {
Some(self.cfg().fee_consensus.peg_in_abs)
}
fn output_fee(&self, _output: &<Self::Common as ModuleCommon>::Output) -> Option<Amount> {
Some(self.cfg().fee_consensus.peg_out_abs)
}
#[cfg(feature = "cli")]
async fn handle_cli_command(
&self,
args: &[std::ffi::OsString],
) -> anyhow::Result<serde_json::Value> {
cli::handle_cli_command(self, args).await
}
}
#[derive(Debug, Clone)]
pub struct WalletClientContext {
rpc: DynBitcoindRpc,
wallet_descriptor: PegInDescriptor,
wallet_decoder: Decoder,
secp: Secp256k1<All>,
}
impl Context for WalletClientContext {}
impl WalletClientModule {
fn cfg(&self) -> &WalletClientConfig {
&self.data.cfg
}
fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
if let Ok(rpc_config) = BitcoinRpcConfig::get_defaults_from_env_vars() {
if rpc_config.kind == "bitcoind" {
cfg.default_bitcoin_rpc.clone()
} else {
rpc_config
}
} else {
cfg.default_bitcoin_rpc.clone()
}
}
pub fn get_network(&self) -> Network {
self.cfg().network
}
pub fn get_fee_consensus(&self) -> FeeConsensus {
self.cfg().fee_consensus
}
async fn allocate_deposit_address_inner(
&self,
dbtx: &mut DatabaseTransaction<'_>,
) -> (OperationId, Address, TweakIdx) {
let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
let (_secret_tweak_key, _, address, operation_id) =
self.data.derive_deposit_address(tweak_idx);
let now = fedimint_core::time::now();
dbtx.insert_new_entry(
&PegInTweakIndexKey(tweak_idx),
&PegInTweakIndexData {
creation_time: now,
next_check_time: Some(now),
last_check_time: None,
operation_id,
claimed: vec![],
},
)
.await;
(operation_id, address, tweak_idx)
}
pub async fn get_withdraw_fees(
&self,
address: bitcoin::Address<NetworkUnchecked>,
amount: bitcoin::Amount,
) -> anyhow::Result<PegOutFees> {
check_address(&address, self.cfg().network)?;
self.module_api
.fetch_peg_out_fees(&address.assume_checked(), amount)
.await?
.context("Federation didn't return peg-out fees")
}
pub fn create_withdraw_output(
&self,
operation_id: OperationId,
address: bitcoin::Address<NetworkUnchecked>,
amount: bitcoin::Amount,
fees: PegOutFees,
) -> anyhow::Result<ClientOutput<WalletOutput, WalletClientStates>> {
check_address(&address, self.cfg().network)?;
let output = WalletOutput::new_v0_peg_out(address, amount, fees);
let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
let sm_gen = move |txid, out_idx| {
vec![WalletClientStates::Withdraw(WithdrawStateMachine {
operation_id,
state: WithdrawStates::Created(CreatedWithdrawState {
fm_outpoint: OutPoint { txid, out_idx },
}),
})]
};
Ok(ClientOutput::<WalletOutput, WalletClientStates> {
output,
amount,
state_machines: Arc::new(sm_gen),
})
}
pub fn create_rbf_withdraw_output(
&self,
operation_id: OperationId,
rbf: &Rbf,
) -> anyhow::Result<ClientOutput<WalletOutput, WalletClientStates>> {
let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
let sm_gen = move |txid, out_idx| {
vec![WalletClientStates::Withdraw(WithdrawStateMachine {
operation_id,
state: WithdrawStates::Created(CreatedWithdrawState {
fm_outpoint: OutPoint { txid, out_idx },
}),
})]
};
Ok(ClientOutput::<WalletOutput, WalletClientStates> {
output,
amount,
state_machines: Arc::new(sm_gen),
})
}
pub async fn allocate_deposit_address_expert_only(
&self,
) -> anyhow::Result<(OperationId, Address, TweakIdx)> {
let (operation_id, address, tweak_idx) = self
.client_ctx
.module_autocommit(
|dbtx, _| {
Box::pin(async {
let (operation_id, address, tweak_idx) = self
.allocate_deposit_address_inner( &mut dbtx.module_dbtx())
.await;
debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
self.rpc
.watch_script_history(&address.script_pubkey())
.await?;
let sender = self.pegin_monitor_wakeup_sender.clone();
dbtx.module_dbtx().on_commit(move || {
let _ = sender.send(());
});
Ok((operation_id, address, tweak_idx))
})
},
Some(100),
)
.await
.map_err(|e| match e {
AutocommitError::CommitFailed {
last_error,
attempts,
} => last_error.context(format!("Failed to commit after {attempts} attempts")),
AutocommitError::ClosureError { error, .. } => error,
})?;
Ok((operation_id, address, tweak_idx))
}
pub async fn find_tweak_idx_by_operation_id(
&self,
operation_id: OperationId,
) -> anyhow::Result<TweakIdx> {
Ok(self
.client_ctx
.module_db()
.clone()
.begin_transaction_nc()
.await
.find_by_prefix(&PegInTweakIndexPrefix)
.await
.filter(|(_k, v)| future::ready(v.operation_id == operation_id))
.next()
.await
.ok_or_else(|| anyhow::format_err!("OperationId not found"))?
.0
.0)
}
pub async fn get_pegin_tweak_idx(
&self,
tweak_idx: TweakIdx,
) -> anyhow::Result<PegInTweakIndexData> {
self.client_ctx
.module_db()
.clone()
.begin_transaction_nc()
.await
.get_value(&PegInTweakIndexKey(tweak_idx))
.await
.ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
}
pub async fn get_claimed_pegins(
&self,
dbtx: &mut DatabaseTransaction<'_>,
tweak_idx: TweakIdx,
) -> Vec<(
bitcoin::OutPoint,
TransactionId,
Vec<fedimint_core::OutPoint>,
)> {
let outpoints = dbtx
.get_value(&PegInTweakIndexKey(tweak_idx))
.await
.map(|v| v.claimed)
.unwrap_or_default();
let mut res = vec![];
for outpoint in outpoints {
let claimed_peg_in_data = dbtx
.get_value(&ClaimedPegInKey {
peg_in_index: tweak_idx,
btc_out_point: outpoint,
})
.await
.expect("Must have a corresponding claim record");
res.push((
outpoint,
claimed_peg_in_data.claim_txid,
claimed_peg_in_data.change,
));
}
res
}
pub async fn recheck_pegin_address_by_op_id(
&self,
operation_id: OperationId,
) -> anyhow::Result<()> {
let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
self.recheck_pegin_address(tweak_idx).await
}
pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
self.client_ctx
.module_autocommit_2(
|dbtx, _| {
Box::pin(async {
let db_key = PegInTweakIndexKey(tweak_idx);
let db_val = dbtx
.module_dbtx()
.get_value(&db_key)
.await
.ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
dbtx.module_dbtx()
.insert_entry(
&db_key,
&PegInTweakIndexData {
next_check_time: Some(fedimint_core::time::now()),
..db_val
},
)
.await;
let sender = self.pegin_monitor_wakeup_sender.clone();
dbtx.module_dbtx().on_commit(move || {
let _ = sender.send(());
});
Ok(())
})
},
Some(100),
)
.await?;
Ok(())
}
pub async fn await_num_deposit_by_operation_id(
&self,
operation_id: OperationId,
num_deposits: usize,
) -> anyhow::Result<()> {
let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
self.await_num_deposits(tweak_idx, num_deposits).await
}
#[instrument(skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
pub async fn await_num_deposits(
&self,
tweak_idx: TweakIdx,
num_deposits: usize,
) -> anyhow::Result<()> {
let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
let mut receiver = self.pegin_claimed_receiver.clone();
loop {
let pegins = self
.get_claimed_pegins(
&mut self.client_ctx.module_db().begin_transaction_nc().await,
tweak_idx,
)
.await;
if pegins.len() < num_deposits {
debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
self.recheck_pegin_address(tweak_idx).await?;
receiver.changed().await?;
continue;
}
debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
for (_outpoint, transaction_id, change) in pegins {
debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
bail!("{}", e);
}
debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
self.client_ctx
.await_primary_module_outputs(operation_id, change)
.await
.expect("Cannot fail if tx was accepted and federation is honest");
}
return Ok(());
}
}
pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
&self,
address: bitcoin::Address<NetworkUnchecked>,
amount: bitcoin::Amount,
fee: PegOutFees,
extra_meta: M,
) -> anyhow::Result<OperationId> {
{
let operation_id = OperationId(thread_rng().gen());
let withdraw_output =
self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
let tx_builder = TransactionBuilder::new()
.with_output(self.client_ctx.make_client_output(withdraw_output));
let extra_meta =
serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
self.client_ctx
.finalize_and_submit_transaction(
operation_id,
WalletCommonInit::KIND.as_str(),
|_, change| WalletOperationMeta {
variant: WalletOperationMetaVariant::Withdraw {
address: address.clone(),
amount,
fee,
change,
},
extra_meta: extra_meta.clone(),
},
tx_builder,
)
.await?;
Ok(operation_id)
}
}
#[deprecated(
since = "0.4.0",
note = "RBF withdrawals are rejected by the federation"
)]
pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
&self,
rbf: Rbf,
extra_meta: M,
) -> anyhow::Result<OperationId> {
let operation_id = OperationId(thread_rng().gen());
let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
let tx_builder = TransactionBuilder::new()
.with_output(self.client_ctx.make_client_output(withdraw_output));
let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
self.client_ctx
.finalize_and_submit_transaction(
operation_id,
WalletCommonInit::KIND.as_str(),
|_, change| WalletOperationMeta {
variant: WalletOperationMetaVariant::RbfWithdraw {
rbf: rbf.clone(),
change,
},
extra_meta: extra_meta.clone(),
},
tx_builder,
)
.await?;
Ok(operation_id)
}
pub async fn subscribe_withdraw_updates(
&self,
operation_id: OperationId,
) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
let operation = self
.client_ctx
.get_operation(operation_id)
.await
.with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
bail!("Operation is not a wallet operation");
}
let operation_meta = operation.meta::<WalletOperationMeta>();
let (WalletOperationMetaVariant::Withdraw { change, .. }
| WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
else {
bail!("Operation is not a withdraw operation");
};
let mut operation_stream = self.notifier.subscribe(operation_id).await;
let client_ctx = self.client_ctx.clone();
Ok(
operation.outcome_or_updates(&self.client_ctx.global_db(), operation_id, || {
stream! {
match next_withdraw_state(&mut operation_stream).await {
Some(WithdrawStates::Created(_)) => {
yield WithdrawState::Created;
},
Some(s) => {
panic!("Unexpected state {s:?}")
},
None => return,
}
let _ = client_ctx
.await_primary_module_outputs(operation_id, change)
.await;
match next_withdraw_state(&mut operation_stream).await {
Some(WithdrawStates::Aborted(inner)) => {
yield WithdrawState::Failed(inner.error);
},
Some(WithdrawStates::Success(inner)) => {
yield WithdrawState::Succeeded(inner.txid);
},
Some(s) => {
panic!("Unexpected state {s:?}")
},
None => {},
}
}
}),
)
}
}
fn check_address(address: &Address<NetworkUnchecked>, network: Network) -> anyhow::Result<()> {
ensure!(
address.is_valid_for_network(network),
"Address isn't compatible with the federation's network: {network:?}"
);
Ok(())
}
async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
let index = dbtx
.get_value(&NextPegInTweakIndexKey)
.await
.unwrap_or_default();
dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
.await;
index
}
#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
pub enum WalletClientStates {
Deposit(DepositStateMachine),
Withdraw(WithdrawStateMachine),
}
impl IntoDynInstance for WalletClientStates {
type DynType = DynState;
fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
DynState::from_typed(instance_id, self)
}
}
impl State for WalletClientStates {
type ModuleContext = WalletClientContext;
fn transitions(
&self,
context: &Self::ModuleContext,
global_context: &DynGlobalClientContext,
) -> Vec<StateTransition<Self>> {
match self {
WalletClientStates::Deposit(sm) => {
sm_enum_variant_translation!(
sm.transitions(context, global_context),
WalletClientStates::Deposit
)
}
WalletClientStates::Withdraw(sm) => {
sm_enum_variant_translation!(
sm.transitions(context, global_context),
WalletClientStates::Withdraw
)
}
}
}
fn operation_id(&self) -> OperationId {
match self {
WalletClientStates::Deposit(sm) => sm.operation_id(),
WalletClientStates::Withdraw(sm) => sm.operation_id(),
}
}
}
const RECOVER_MAX_GAP: u64 = 10;
const RECOVER_NUM_IDX_ADD_TO_LAST_USED: u64 = RECOVER_MAX_GAP / 4 + 1;
#[derive(Clone, PartialEq, Eq, Debug)]
struct RecoverScanOutcome {
last_used_idx: Option<TweakIdx>,
new_start_idx: TweakIdx,
tweak_idxes_with_pegins: BTreeSet<TweakIdx>,
}
async fn recover_scan_idxes_for_activity<F, FF, T>(
previous_next_unused_idx: TweakIdx,
check_addr_history: F,
) -> anyhow::Result<RecoverScanOutcome>
where
F: Fn(TweakIdx) -> FF,
FF: Future<Output = anyhow::Result<Vec<T>>>,
{
let mut last_used_idx = if previous_next_unused_idx == TweakIdx::ZERO {
None
} else {
Some(
previous_next_unused_idx
.prev()
.expect("Check for not being zero"),
)
};
let mut cur_tweak_idx = previous_next_unused_idx;
let mut tweak_idxes_with_pegins = BTreeSet::new();
let new_start_idx = loop {
let gap_since_last_used = cur_tweak_idx - last_used_idx.unwrap_or_default();
if RECOVER_MAX_GAP <= gap_since_last_used {
break last_used_idx
.unwrap_or_default()
.advance(RECOVER_NUM_IDX_ADD_TO_LAST_USED);
}
let history = check_addr_history(cur_tweak_idx).await?;
if !history.is_empty() {
last_used_idx = Some(cur_tweak_idx);
tweak_idxes_with_pegins.insert(cur_tweak_idx);
}
cur_tweak_idx = cur_tweak_idx.next();
};
Ok(RecoverScanOutcome {
last_used_idx,
new_start_idx,
tweak_idxes_with_pegins,
})
}
#[cfg(all(test, not(target_family = "wasm")))]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use super::*;
#[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
async fn sanity_test_recover_inner() {
{
let last_checked = AtomicBool::new(false);
let last_checked = &last_checked;
assert_eq!(
recover_scan_idxes_for_activity(TweakIdx(0), |cur_idx| async move {
Ok(match cur_idx {
TweakIdx(9) => {
last_checked.store(true, Ordering::SeqCst);
vec![]
}
TweakIdx(10) => panic!("Shouldn't happen"),
TweakIdx(11) => {
vec![0usize] }
_ => vec![],
})
})
.await
.unwrap(),
RecoverScanOutcome {
last_used_idx: None,
new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
tweak_idxes_with_pegins: BTreeSet::from([])
}
);
assert!(last_checked.load(Ordering::SeqCst));
}
{
let last_checked = AtomicBool::new(false);
let last_checked = &last_checked;
assert_eq!(
recover_scan_idxes_for_activity(TweakIdx(10), |cur_idx| async move {
Ok(match cur_idx {
TweakIdx(10) => vec![()],
TweakIdx(19) => {
last_checked.store(true, Ordering::SeqCst);
vec![]
}
TweakIdx(20) => panic!("Shouldn't happen"),
_ => vec![],
})
})
.await
.unwrap(),
RecoverScanOutcome {
last_used_idx: Some(TweakIdx(10)),
new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
}
);
assert!(last_checked.load(Ordering::SeqCst));
}
assert_eq!(
recover_scan_idxes_for_activity(TweakIdx(0), |cur_idx| async move {
Ok(match cur_idx {
TweakIdx(6 | 15) => vec![()],
_ => vec![],
})
})
.await
.unwrap(),
RecoverScanOutcome {
last_used_idx: Some(TweakIdx(15)),
new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
}
);
assert_eq!(
recover_scan_idxes_for_activity(TweakIdx(10), |cur_idx| async move {
Ok(match cur_idx {
TweakIdx(8) => {
vec![()] }
TweakIdx(9) => {
panic!("Shouldn't happen")
}
_ => vec![],
})
})
.await
.unwrap(),
RecoverScanOutcome {
last_used_idx: Some(TweakIdx(9)),
new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
tweak_idxes_with_pegins: BTreeSet::from([])
}
);
assert_eq!(
recover_scan_idxes_for_activity(TweakIdx(10), |cur_idx| async move {
Ok(match cur_idx {
TweakIdx(9) => panic!("Shouldn't happen"),
TweakIdx(15) => vec![()],
_ => vec![],
})
})
.await
.unwrap(),
RecoverScanOutcome {
last_used_idx: Some(TweakIdx(15)),
new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
}
);
}
}