ln_gateway/gateway_module_v2/
mod.rs

1mod api;
2mod complete_sm;
3pub mod events;
4mod receive_sm;
5mod send_sm;
6
7use std::collections::BTreeMap;
8use std::fmt;
9use std::sync::Arc;
10
11use anyhow::{anyhow, ensure};
12use bitcoin::hashes::sha256;
13use bitcoin::secp256k1::Message;
14use events::{IncomingPaymentStarted, OutgoingPaymentStarted};
15use fedimint_api_client::api::DynModuleApi;
16use fedimint_client::module::init::{ClientModuleInit, ClientModuleInitArgs};
17use fedimint_client::module::recovery::NoModuleBackup;
18use fedimint_client::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
19use fedimint_client::sm::util::MapStateTransitions;
20use fedimint_client::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
21use fedimint_client::transaction::{
22    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
23};
24use fedimint_client::{sm_enum_variant_translation, DynGlobalClientContext};
25use fedimint_core::config::FederationId;
26use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
27use fedimint_core::db::DatabaseTransaction;
28use fedimint_core::encoding::{Decodable, Encodable};
29use fedimint_core::module::{
30    ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
31};
32use fedimint_core::secp256k1::Keypair;
33use fedimint_core::time::now;
34use fedimint_core::{apply, async_trait_maybe_send, secp256k1, Amount, OutPoint, PeerId};
35use fedimint_lnv2_common::config::LightningClientConfig;
36use fedimint_lnv2_common::contracts::{IncomingContract, PaymentImage};
37use fedimint_lnv2_common::gateway_api::SendPaymentPayload;
38use fedimint_lnv2_common::{
39    LightningCommonInit, LightningInvoice, LightningModuleTypes, LightningOutput, LightningOutputV0,
40};
41use futures::StreamExt;
42use receive_sm::{ReceiveSMState, ReceiveStateMachine};
43use secp256k1::schnorr::Signature;
44use send_sm::{SendSMState, SendStateMachine};
45use serde::{Deserialize, Serialize};
46use tpe::{AggregatePublicKey, PublicKeyShare};
47use tracing::{info, warn};
48
49use crate::gateway_module_v2::api::GatewayFederationApi;
50use crate::gateway_module_v2::complete_sm::{
51    CompleteSMCommon, CompleteSMState, CompleteStateMachine,
52};
53use crate::gateway_module_v2::receive_sm::ReceiveSMCommon;
54use crate::gateway_module_v2::send_sm::SendSMCommon;
55use crate::{Gateway, EXPIRATION_DELTA_MINIMUM_V2};
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct GatewayOperationMetaV2;
59
60#[derive(Debug, Clone)]
61pub struct GatewayClientInitV2 {
62    pub gateway: Arc<Gateway>,
63}
64
65impl ModuleInit for GatewayClientInitV2 {
66    type Common = LightningCommonInit;
67
68    async fn dump_database(
69        &self,
70        _dbtx: &mut DatabaseTransaction<'_>,
71        _prefix_names: Vec<String>,
72    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
73        Box::new(vec![].into_iter())
74    }
75}
76
77#[apply(async_trait_maybe_send!)]
78impl ClientModuleInit for GatewayClientInitV2 {
79    type Module = GatewayClientModuleV2;
80
81    fn supported_api_versions(&self) -> MultiApiVersion {
82        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
83            .expect("no version conflicts")
84    }
85
86    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
87        Ok(GatewayClientModuleV2 {
88            federation_id: *args.federation_id(),
89            cfg: args.cfg().clone(),
90            notifier: args.notifier().clone(),
91            client_ctx: args.context(),
92            module_api: args.module_api().clone(),
93            keypair: args
94                .module_root_secret()
95                .clone()
96                .to_secp_key(fedimint_core::secp256k1::SECP256K1),
97            gateway: self.gateway.clone(),
98        })
99    }
100}
101
102#[derive(Debug, Clone)]
103pub struct GatewayClientModuleV2 {
104    pub federation_id: FederationId,
105    pub cfg: LightningClientConfig,
106    pub notifier: ModuleNotifier<GatewayClientStateMachinesV2>,
107    pub client_ctx: ClientContext<Self>,
108    pub module_api: DynModuleApi,
109    pub keypair: Keypair,
110    pub gateway: Arc<Gateway>,
111}
112
113#[derive(Debug, Clone)]
114pub struct GatewayClientContextV2 {
115    pub module: GatewayClientModuleV2,
116    pub decoder: Decoder,
117    pub tpe_agg_pk: AggregatePublicKey,
118    pub tpe_pks: BTreeMap<PeerId, PublicKeyShare>,
119    pub gateway: Arc<Gateway>,
120}
121
122impl Context for GatewayClientContextV2 {
123    const KIND: Option<ModuleKind> = Some(fedimint_lnv2_common::KIND);
124}
125
126impl ClientModule for GatewayClientModuleV2 {
127    type Init = GatewayClientInitV2;
128    type Common = LightningModuleTypes;
129    type Backup = NoModuleBackup;
130    type ModuleStateMachineContext = GatewayClientContextV2;
131    type States = GatewayClientStateMachinesV2;
132
133    fn context(&self) -> Self::ModuleStateMachineContext {
134        GatewayClientContextV2 {
135            module: self.clone(),
136            decoder: self.decoder(),
137            tpe_agg_pk: self.cfg.tpe_agg_pk,
138            tpe_pks: self.cfg.tpe_pks.clone(),
139            gateway: self.gateway.clone(),
140        }
141    }
142    fn input_fee(
143        &self,
144        amount: Amount,
145        _input: &<Self::Common as ModuleCommon>::Input,
146    ) -> Option<Amount> {
147        Some(self.cfg.fee_consensus.fee(amount))
148    }
149
150    fn output_fee(
151        &self,
152        _amount: Amount,
153        output: &<Self::Common as ModuleCommon>::Output,
154    ) -> Option<Amount> {
155        let amount = match output.ensure_v0_ref().ok()? {
156            LightningOutputV0::Outgoing(contract) => contract.amount,
157            LightningOutputV0::Incoming(contract) => contract.commitment.amount,
158        };
159
160        Some(self.cfg.fee_consensus.fee(amount))
161    }
162}
163
164#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
165pub enum GatewayClientStateMachinesV2 {
166    Send(SendStateMachine),
167    Receive(ReceiveStateMachine),
168    Complete(CompleteStateMachine),
169}
170
171impl fmt::Display for GatewayClientStateMachinesV2 {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        match self {
174            GatewayClientStateMachinesV2::Send(send) => {
175                write!(f, "{send}")
176            }
177            GatewayClientStateMachinesV2::Receive(receive) => {
178                write!(f, "{receive}")
179            }
180            GatewayClientStateMachinesV2::Complete(complete) => {
181                write!(f, "{complete}")
182            }
183        }
184    }
185}
186
187impl IntoDynInstance for GatewayClientStateMachinesV2 {
188    type DynType = DynState;
189
190    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
191        DynState::from_typed(instance_id, self)
192    }
193}
194
195impl State for GatewayClientStateMachinesV2 {
196    type ModuleContext = GatewayClientContextV2;
197
198    fn transitions(
199        &self,
200        context: &Self::ModuleContext,
201        global_context: &DynGlobalClientContext,
202    ) -> Vec<StateTransition<Self>> {
203        match self {
204            GatewayClientStateMachinesV2::Send(state) => {
205                sm_enum_variant_translation!(
206                    state.transitions(context, global_context),
207                    GatewayClientStateMachinesV2::Send
208                )
209            }
210            GatewayClientStateMachinesV2::Receive(state) => {
211                sm_enum_variant_translation!(
212                    state.transitions(context, global_context),
213                    GatewayClientStateMachinesV2::Receive
214                )
215            }
216            GatewayClientStateMachinesV2::Complete(state) => {
217                sm_enum_variant_translation!(
218                    state.transitions(context, global_context),
219                    GatewayClientStateMachinesV2::Complete
220                )
221            }
222        }
223    }
224
225    fn operation_id(&self) -> OperationId {
226        match self {
227            GatewayClientStateMachinesV2::Send(state) => state.operation_id(),
228            GatewayClientStateMachinesV2::Receive(state) => state.operation_id(),
229            GatewayClientStateMachinesV2::Complete(state) => state.operation_id(),
230        }
231    }
232}
233
234#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Decodable, Encodable)]
235pub enum FinalReceiveState {
236    Rejected,
237    Success([u8; 32]),
238    Refunded,
239    Failure,
240}
241
242impl GatewayClientModuleV2 {
243    pub async fn send_payment(
244        &self,
245        payload: SendPaymentPayload,
246    ) -> anyhow::Result<Result<[u8; 32], Signature>> {
247        let operation_start = now();
248
249        // The operation id is equal to the contract id which also doubles as the
250        // message signed by the gateway via the forfeit signature to forfeit
251        // the gateways claim to a contract in case of cancellation. We only create a
252        // forfeit signature after we have started the send state machine to
253        // prevent replay attacks with a previously cancelled outgoing contract
254        let operation_id = OperationId::from_encodable(&payload.contract.clone());
255
256        if self.client_ctx.operation_exists(operation_id).await {
257            return Ok(self.subscribe_send(operation_id).await);
258        }
259
260        // Since the following four checks may only fail due to client side
261        // programming error we do not have to enable cancellation and can check
262        // them before we start the state machine.
263        ensure!(
264            payload.contract.claim_pk == self.keypair.public_key(),
265            "The outgoing contract is keyed to another gateway"
266        );
267
268        // This prevents DOS attacks where an attacker submits a different invoice.
269        ensure!(
270            secp256k1::SECP256K1
271                .verify_schnorr(
272                    &payload.auth,
273                    &Message::from_digest(
274                        *payload.invoice.consensus_hash::<sha256::Hash>().as_ref()
275                    ),
276                    &payload.contract.refund_pk.x_only_public_key().0,
277                )
278                .is_ok(),
279            "Invalid auth signature for the invoice data"
280        );
281
282        // We need to check that the contract has been confirmed by the federation
283        // before we start the state machine to prevent DOS attacks.
284        let max_delay = self
285            .module_api
286            .outgoing_contract_expiration(&payload.contract.contract_id())
287            .await
288            .map_err(|_| anyhow!("The gateway can not reach the federation"))?
289            .ok_or(anyhow!("The outgoing contract has not yet been confirmed"))?
290            .saturating_sub(EXPIRATION_DELTA_MINIMUM_V2);
291
292        let (payment_hash, amount) = match &payload.invoice {
293            LightningInvoice::Bolt11(invoice) => (
294                invoice.payment_hash(),
295                invoice
296                    .amount_milli_satoshis()
297                    .ok_or(anyhow!("Invoice is missing amount"))?,
298            ),
299        };
300
301        ensure!(
302            PaymentImage::Hash(*payment_hash) == payload.contract.payment_image,
303            "The invoices payment hash does not match the contracts payment hash"
304        );
305
306        let min_contract_amount = self
307            .gateway
308            .routing_info_v2(&payload.federation_id)
309            .await?
310            .ok_or(anyhow!("Routing Info not available"))?
311            .send_fee_minimum
312            .add_to(amount);
313
314        let send_sm = GatewayClientStateMachinesV2::Send(SendStateMachine {
315            common: SendSMCommon {
316                operation_id,
317                contract: payload.contract.clone(),
318                max_delay,
319                min_contract_amount,
320                invoice: payload.invoice,
321                claim_keypair: self.keypair,
322            },
323            state: SendSMState::Sending,
324        });
325
326        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
327        self.client_ctx
328            .manual_operation_start_dbtx(
329                &mut dbtx.to_ref_nc(),
330                operation_id,
331                LightningCommonInit::KIND.as_str(),
332                GatewayOperationMetaV2,
333                vec![self.client_ctx.make_dyn_state(send_sm)],
334            )
335            .await
336            .ok();
337
338        self.client_ctx
339            .log_event(
340                &mut dbtx,
341                OutgoingPaymentStarted {
342                    operation_start,
343                    outgoing_contract: payload.contract.clone(),
344                    min_contract_amount,
345                    invoice_amount: Amount::from_msats(amount),
346                    max_delay,
347                },
348            )
349            .await;
350        dbtx.commit_tx().await;
351
352        Ok(self.subscribe_send(operation_id).await)
353    }
354
355    pub async fn subscribe_send(&self, operation_id: OperationId) -> Result<[u8; 32], Signature> {
356        let mut stream = self.notifier.subscribe(operation_id).await;
357
358        loop {
359            if let Some(GatewayClientStateMachinesV2::Send(state)) = stream.next().await {
360                match state.state {
361                    SendSMState::Sending => {}
362                    SendSMState::Claiming(claiming) => {
363                        // This increases latency by one ordering and may eventually be removed;
364                        // however, at the current stage of lnv2 we prioritize the verification of
365                        // correctness above minimum latency.
366                        assert!(
367                            self.client_ctx
368                                .await_primary_module_outputs(operation_id, claiming.outpoints)
369                                .await
370                                .is_ok(),
371                            "Gateway Module V2 failed to claim outgoing contract with preimage"
372                        );
373
374                        return Ok(claiming.preimage);
375                    }
376                    SendSMState::Cancelled(cancelled) => {
377                        warn!("Outgoing lightning payment is cancelled {:?}", cancelled);
378
379                        let signature = self
380                            .keypair
381                            .sign_schnorr(state.common.contract.forfeit_message());
382
383                        assert!(state.common.contract.verify_forfeit_signature(&signature));
384
385                        return Err(signature);
386                    }
387                }
388            }
389        }
390    }
391
392    pub async fn relay_incoming_htlc(
393        &self,
394        payment_hash: sha256::Hash,
395        incoming_chan_id: u64,
396        htlc_id: u64,
397        contract: IncomingContract,
398        amount_msat: u64,
399    ) -> anyhow::Result<()> {
400        let operation_start = now();
401
402        let operation_id = OperationId::from_encodable(&contract);
403
404        if self.client_ctx.operation_exists(operation_id).await {
405            return Ok(());
406        }
407
408        let refund_keypair = self.keypair;
409
410        let client_output = ClientOutput::<LightningOutput> {
411            output: LightningOutput::V0(LightningOutputV0::Incoming(contract.clone())),
412            amount: contract.commitment.amount,
413        };
414        let commitment = contract.commitment.clone();
415        let client_output_sm = ClientOutputSM::<GatewayClientStateMachinesV2> {
416            state_machines: Arc::new(move |out_point_range: OutPointRange| {
417                assert_eq!(out_point_range.count(), 1);
418                let out_idx = out_point_range.start_idx();
419                vec![
420                    GatewayClientStateMachinesV2::Receive(ReceiveStateMachine {
421                        common: ReceiveSMCommon {
422                            operation_id,
423                            contract: contract.clone(),
424                            out_point: OutPoint {
425                                txid: out_point_range.txid(),
426                                out_idx,
427                            },
428                            refund_keypair,
429                        },
430                        state: ReceiveSMState::Funding,
431                    }),
432                    GatewayClientStateMachinesV2::Complete(CompleteStateMachine {
433                        common: CompleteSMCommon {
434                            operation_id,
435                            payment_hash,
436                            incoming_chan_id,
437                            htlc_id,
438                        },
439                        state: CompleteSMState::Pending,
440                    }),
441                ]
442            }),
443        };
444
445        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
446            vec![client_output],
447            vec![client_output_sm],
448        ));
449        let transaction = TransactionBuilder::new().with_outputs(client_output);
450
451        self.client_ctx
452            .finalize_and_submit_transaction(
453                operation_id,
454                LightningCommonInit::KIND.as_str(),
455                |_| GatewayOperationMetaV2,
456                transaction,
457            )
458            .await?;
459
460        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
461        self.client_ctx
462            .log_event(
463                &mut dbtx,
464                IncomingPaymentStarted {
465                    operation_start,
466                    incoming_contract_commitment: commitment,
467                    invoice_amount: Amount::from_msats(amount_msat),
468                },
469            )
470            .await;
471        dbtx.commit_tx().await;
472
473        Ok(())
474    }
475
476    pub async fn relay_direct_swap(
477        &self,
478        contract: IncomingContract,
479        amount_msat: u64,
480    ) -> anyhow::Result<FinalReceiveState> {
481        let operation_start = now();
482
483        let operation_id = OperationId::from_encodable(&contract);
484
485        if self.client_ctx.operation_exists(operation_id).await {
486            return Ok(self.await_receive(operation_id).await);
487        }
488
489        let refund_keypair = self.keypair;
490
491        let client_output = ClientOutput::<LightningOutput> {
492            output: LightningOutput::V0(LightningOutputV0::Incoming(contract.clone())),
493            amount: contract.commitment.amount,
494        };
495        let commitment = contract.commitment.clone();
496        let client_output_sm = ClientOutputSM::<GatewayClientStateMachinesV2> {
497            state_machines: Arc::new(move |out_point_range| {
498                assert_eq!(out_point_range.count(), 1);
499                let out_idx = out_point_range.start_idx();
500                vec![GatewayClientStateMachinesV2::Receive(ReceiveStateMachine {
501                    common: ReceiveSMCommon {
502                        operation_id,
503                        contract: contract.clone(),
504                        out_point: OutPoint {
505                            txid: out_point_range.txid(),
506                            out_idx,
507                        },
508                        refund_keypair,
509                    },
510                    state: ReceiveSMState::Funding,
511                })]
512            }),
513        };
514
515        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
516            vec![client_output],
517            vec![client_output_sm],
518        ));
519
520        let transaction = TransactionBuilder::new().with_outputs(client_output);
521
522        self.client_ctx
523            .finalize_and_submit_transaction(
524                operation_id,
525                LightningCommonInit::KIND.as_str(),
526                |_| GatewayOperationMetaV2,
527                transaction,
528            )
529            .await?;
530
531        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
532        self.client_ctx
533            .log_event(
534                &mut dbtx,
535                IncomingPaymentStarted {
536                    operation_start,
537                    incoming_contract_commitment: commitment,
538                    invoice_amount: Amount::from_msats(amount_msat),
539                },
540            )
541            .await;
542        dbtx.commit_tx().await;
543
544        Ok(self.await_receive(operation_id).await)
545    }
546
547    async fn await_receive(&self, operation_id: OperationId) -> FinalReceiveState {
548        let mut stream = self.notifier.subscribe(operation_id).await;
549
550        loop {
551            if let Some(GatewayClientStateMachinesV2::Receive(state)) = stream.next().await {
552                match state.state {
553                    ReceiveSMState::Funding => {}
554                    ReceiveSMState::Rejected(..) => return FinalReceiveState::Rejected,
555                    ReceiveSMState::Success(preimage) => {
556                        return FinalReceiveState::Success(preimage)
557                    }
558                    ReceiveSMState::Refunding(out_points) => {
559                        if self
560                            .client_ctx
561                            .await_primary_module_outputs(operation_id, out_points)
562                            .await
563                            .is_err()
564                        {
565                            return FinalReceiveState::Failure;
566                        }
567
568                        return FinalReceiveState::Refunded;
569                    }
570                    ReceiveSMState::Failure => return FinalReceiveState::Failure,
571                }
572            }
573        }
574    }
575
576    /// For the given `OperationId`, this function will wait until the Complete
577    /// state machine has finished or failed.
578    pub async fn await_completion(&self, operation_id: OperationId) {
579        let mut stream = self.notifier.subscribe(operation_id).await;
580
581        loop {
582            match stream.next().await {
583                Some(GatewayClientStateMachinesV2::Complete(state)) => {
584                    if state.state == CompleteSMState::Completed {
585                        info!(%state, "LNv2 completion state machine finished");
586                        return;
587                    }
588
589                    info!(%state, "Waiting for LNv2 completion state machine");
590                }
591                Some(GatewayClientStateMachinesV2::Receive(state)) => {
592                    info!(%state, "Waiting for LNv2 completion state machine");
593                    continue;
594                }
595                Some(state) => {
596                    warn!(%state, "Operation is not an LNv2 completion state machine");
597                    return;
598                }
599                None => return,
600            }
601        }
602    }
603}