ln_gateway/state_machine/
mod.rs

1mod complete;
2pub mod events;
3pub mod pay;
4
5use std::collections::BTreeMap;
6use std::fmt;
7use std::sync::Arc;
8use std::time::Duration;
9
10use anyhow::ensure;
11use async_stream::stream;
12use bitcoin::hashes::{sha256, Hash};
13use bitcoin::key::Secp256k1;
14use bitcoin::secp256k1::All;
15use events::{IncomingPaymentStarted, OutgoingPaymentStarted};
16use fedimint_api_client::api::DynModuleApi;
17use fedimint_client::derivable_secret::ChildId;
18use fedimint_client::module::init::{ClientModuleInit, ClientModuleInitArgs};
19use fedimint_client::module::recovery::NoModuleBackup;
20use fedimint_client::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
21use fedimint_client::oplog::UpdateStreamOrOutcome;
22use fedimint_client::sm::util::MapStateTransitions;
23use fedimint_client::sm::{Context, DynState, ModuleNotifier, State};
24use fedimint_client::transaction::{
25    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
26};
27use fedimint_client::{sm_enum_variant_translation, AddStateMachinesError, DynGlobalClientContext};
28use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
29use fedimint_core::db::{AutocommitError, DatabaseTransaction};
30use fedimint_core::encoding::{Decodable, Encodable};
31use fedimint_core::module::{ApiVersion, ModuleInit, MultiApiVersion};
32use fedimint_core::{apply, async_trait_maybe_send, secp256k1, Amount, OutPoint};
33use fedimint_lightning::{InterceptPaymentRequest, LightningContext};
34use fedimint_ln_client::api::LnFederationApi;
35use fedimint_ln_client::incoming::{
36    FundingOfferState, IncomingSmCommon, IncomingSmError, IncomingSmStates, IncomingStateMachine,
37};
38use fedimint_ln_client::pay::{PayInvoicePayload, PaymentData};
39use fedimint_ln_client::{
40    create_incoming_contract_output, LightningClientContext, LightningClientInit,
41    RealGatewayConnection,
42};
43use fedimint_ln_common::config::LightningClientConfig;
44use fedimint_ln_common::contracts::{ContractId, Preimage};
45use fedimint_ln_common::route_hints::RouteHint;
46use fedimint_ln_common::{
47    create_gateway_remove_message, LightningCommonInit, LightningGateway,
48    LightningGatewayAnnouncement, LightningModuleTypes, LightningOutput, LightningOutputV0,
49    RemoveGatewayRequest, KIND,
50};
51use futures::StreamExt;
52use lightning_invoice::RoutingFees;
53use secp256k1::Keypair;
54use serde::{Deserialize, Serialize};
55use tracing::{debug, error, info, warn};
56
57use self::complete::GatewayCompleteStateMachine;
58use self::pay::{
59    GatewayPayCommon, GatewayPayInvoice, GatewayPayStateMachine, GatewayPayStates,
60    OutgoingPaymentError,
61};
62use crate::state_machine::complete::{
63    GatewayCompleteCommon, GatewayCompleteStates, WaitForPreimageState,
64};
65use crate::Gateway;
66
67/// The high-level state of a reissue operation started with
68/// [`GatewayClientModule::gateway_pay_bolt11_invoice`].
69#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
70pub enum GatewayExtPayStates {
71    Created,
72    Preimage {
73        preimage: Preimage,
74    },
75    Success {
76        preimage: Preimage,
77        out_points: Vec<OutPoint>,
78    },
79    Canceled {
80        error: OutgoingPaymentError,
81    },
82    Fail {
83        error: OutgoingPaymentError,
84        error_message: String,
85    },
86    OfferDoesNotExist {
87        contract_id: ContractId,
88    },
89}
90
91/// The high-level state of an intercepted HTLC operation started with
92/// [`GatewayClientModule::gateway_handle_intercepted_htlc`].
93#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
94pub enum GatewayExtReceiveStates {
95    Funding,
96    Preimage(Preimage),
97    RefundSuccess {
98        out_points: Vec<OutPoint>,
99        error: IncomingSmError,
100    },
101    RefundError {
102        error_message: String,
103        error: IncomingSmError,
104    },
105    FundingFailed {
106        error: IncomingSmError,
107    },
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub enum GatewayMeta {
112    Pay,
113    Receive,
114}
115
116#[derive(Debug, Clone)]
117pub struct GatewayClientInit {
118    pub federation_index: u64,
119    pub gateway: Arc<Gateway>,
120}
121
122impl ModuleInit for GatewayClientInit {
123    type Common = LightningCommonInit;
124
125    async fn dump_database(
126        &self,
127        _dbtx: &mut DatabaseTransaction<'_>,
128        _prefix_names: Vec<String>,
129    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
130        Box::new(vec![].into_iter())
131    }
132}
133
134#[apply(async_trait_maybe_send!)]
135impl ClientModuleInit for GatewayClientInit {
136    type Module = GatewayClientModule;
137
138    fn supported_api_versions(&self) -> MultiApiVersion {
139        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
140            .expect("no version conflicts")
141    }
142
143    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144        Ok(GatewayClientModule {
145            cfg: args.cfg().clone(),
146            notifier: args.notifier().clone(),
147            redeem_key: args
148                .module_root_secret()
149                .child_key(ChildId(0))
150                .to_secp_key(&fedimint_core::secp256k1::Secp256k1::new()),
151            module_api: args.module_api().clone(),
152            federation_index: self.federation_index,
153            client_ctx: args.context(),
154            gateway: self.gateway.clone(),
155        })
156    }
157}
158
159#[derive(Debug, Clone)]
160pub struct GatewayClientContext {
161    redeem_key: Keypair,
162    secp: Secp256k1<All>,
163    pub ln_decoder: Decoder,
164    notifier: ModuleNotifier<GatewayClientStateMachines>,
165    gateway: Arc<Gateway>,
166    pub client_ctx: ClientContext<GatewayClientModule>,
167}
168
169impl Context for GatewayClientContext {
170    const KIND: Option<ModuleKind> = Some(fedimint_ln_common::KIND);
171}
172
173impl From<&GatewayClientContext> for LightningClientContext {
174    fn from(ctx: &GatewayClientContext) -> Self {
175        LightningClientContext {
176            ln_decoder: ctx.ln_decoder.clone(),
177            redeem_key: ctx.redeem_key,
178            gateway_conn: Arc::new(RealGatewayConnection::default()),
179        }
180    }
181}
182
183/// Client side Lightning module **for the gateway**.
184///
185/// For the client side Lightning module for normal clients,
186/// see [`fedimint_ln_client::LightningClientModule`]
187#[derive(Debug)]
188pub struct GatewayClientModule {
189    cfg: LightningClientConfig,
190    pub notifier: ModuleNotifier<GatewayClientStateMachines>,
191    pub redeem_key: Keypair,
192    federation_index: u64,
193    module_api: DynModuleApi,
194    client_ctx: ClientContext<Self>,
195    gateway: Arc<Gateway>,
196}
197
198impl ClientModule for GatewayClientModule {
199    type Init = LightningClientInit;
200    type Common = LightningModuleTypes;
201    type Backup = NoModuleBackup;
202    type ModuleStateMachineContext = GatewayClientContext;
203    type States = GatewayClientStateMachines;
204
205    fn context(&self) -> Self::ModuleStateMachineContext {
206        Self::ModuleStateMachineContext {
207            redeem_key: self.redeem_key,
208            secp: Secp256k1::new(),
209            ln_decoder: self.decoder(),
210            notifier: self.notifier.clone(),
211            gateway: self.gateway.clone(),
212            client_ctx: self.client_ctx.clone(),
213        }
214    }
215
216    fn input_fee(
217        &self,
218        _amount: Amount,
219        _input: &<Self::Common as fedimint_core::module::ModuleCommon>::Input,
220    ) -> Option<Amount> {
221        Some(self.cfg.fee_consensus.contract_input)
222    }
223
224    fn output_fee(
225        &self,
226        _amount: Amount,
227        output: &<Self::Common as fedimint_core::module::ModuleCommon>::Output,
228    ) -> Option<Amount> {
229        match output.maybe_v0_ref()? {
230            LightningOutputV0::Contract(_) => Some(self.cfg.fee_consensus.contract_output),
231            LightningOutputV0::Offer(_) | LightningOutputV0::CancelOutgoing { .. } => {
232                Some(Amount::ZERO)
233            }
234        }
235    }
236}
237
238impl GatewayClientModule {
239    fn to_gateway_registration_info(
240        &self,
241        route_hints: Vec<RouteHint>,
242        ttl: Duration,
243        fees: RoutingFees,
244        lightning_context: LightningContext,
245    ) -> LightningGatewayAnnouncement {
246        LightningGatewayAnnouncement {
247            info: LightningGateway {
248                federation_index: self.federation_index,
249                gateway_redeem_key: self.redeem_key.public_key(),
250                node_pub_key: lightning_context.lightning_public_key,
251                lightning_alias: lightning_context.lightning_alias,
252                api: self.gateway.versioned_api.clone(),
253                route_hints,
254                fees,
255                gateway_id: self.gateway.gateway_id,
256                supports_private_payments: lightning_context.lnrpc.supports_private_payments(),
257            },
258            ttl,
259            vetted: false,
260        }
261    }
262
263    async fn create_funding_incoming_contract_output_from_htlc(
264        &self,
265        htlc: Htlc,
266    ) -> Result<
267        (
268            OperationId,
269            Amount,
270            ClientOutput<LightningOutputV0>,
271            ClientOutputSM<GatewayClientStateMachines>,
272            ContractId,
273        ),
274        IncomingSmError,
275    > {
276        let operation_id = OperationId(htlc.payment_hash.to_byte_array());
277        let (incoming_output, amount, contract_id) = create_incoming_contract_output(
278            &self.module_api,
279            htlc.payment_hash,
280            htlc.outgoing_amount_msat,
281            &self.redeem_key,
282        )
283        .await?;
284
285        let client_output = ClientOutput::<LightningOutputV0> {
286            output: incoming_output,
287            amount,
288        };
289        let client_output_sm = ClientOutputSM::<GatewayClientStateMachines> {
290            state_machines: Arc::new(move |out_point_range: OutPointRange| {
291                assert_eq!(out_point_range.count(), 1);
292                vec![
293                    GatewayClientStateMachines::Receive(IncomingStateMachine {
294                        common: IncomingSmCommon {
295                            operation_id,
296                            contract_id,
297                            payment_hash: htlc.payment_hash,
298                        },
299                        state: IncomingSmStates::FundingOffer(FundingOfferState {
300                            txid: out_point_range.txid(),
301                        }),
302                    }),
303                    GatewayClientStateMachines::Complete(GatewayCompleteStateMachine {
304                        common: GatewayCompleteCommon {
305                            operation_id,
306                            payment_hash: htlc.payment_hash,
307                            incoming_chan_id: htlc.incoming_chan_id,
308                            htlc_id: htlc.htlc_id,
309                        },
310                        state: GatewayCompleteStates::WaitForPreimage(WaitForPreimageState),
311                    }),
312                ]
313            }),
314        };
315        Ok((
316            operation_id,
317            amount,
318            client_output,
319            client_output_sm,
320            contract_id,
321        ))
322    }
323
324    async fn create_funding_incoming_contract_output_from_swap(
325        &self,
326        swap: SwapParameters,
327    ) -> Result<
328        (
329            OperationId,
330            ClientOutput<LightningOutputV0>,
331            ClientOutputSM<GatewayClientStateMachines>,
332        ),
333        IncomingSmError,
334    > {
335        let payment_hash = swap.payment_hash;
336        let operation_id = OperationId(payment_hash.to_byte_array());
337        let (incoming_output, amount, contract_id) = create_incoming_contract_output(
338            &self.module_api,
339            payment_hash,
340            swap.amount_msat,
341            &self.redeem_key,
342        )
343        .await?;
344
345        let client_output = ClientOutput::<LightningOutputV0> {
346            output: incoming_output,
347            amount,
348        };
349        let client_output_sm = ClientOutputSM::<GatewayClientStateMachines> {
350            state_machines: Arc::new(move |out_point_range| {
351                assert_eq!(out_point_range.count(), 1);
352                vec![GatewayClientStateMachines::Receive(IncomingStateMachine {
353                    common: IncomingSmCommon {
354                        operation_id,
355                        contract_id,
356                        payment_hash,
357                    },
358                    state: IncomingSmStates::FundingOffer(FundingOfferState {
359                        txid: out_point_range.txid(),
360                    }),
361                })]
362            }),
363        };
364        Ok((operation_id, client_output, client_output_sm))
365    }
366
367    /// Register gateway with federation
368    pub async fn try_register_with_federation(
369        &self,
370        route_hints: Vec<RouteHint>,
371        time_to_live: Duration,
372        fees: RoutingFees,
373        lightning_context: LightningContext,
374    ) {
375        let registration_info =
376            self.to_gateway_registration_info(route_hints, time_to_live, fees, lightning_context);
377        let gateway_id = registration_info.info.gateway_id;
378
379        let federation_id = self
380            .client_ctx
381            .get_config()
382            .await
383            .global
384            .calculate_federation_id();
385        if let Err(e) = self.module_api.register_gateway(&registration_info).await {
386            warn!(
387                ?e,
388                "Failed to register gateway {gateway_id} with federation {federation_id}"
389            );
390        } else {
391            info!("Successfully registered gateway {gateway_id} with federation {federation_id}");
392        }
393    }
394
395    /// Attempts to remove a gateway's registration from the federation. Since
396    /// removing gateway registrations is best effort, this does not return
397    /// an error and simply emits a warning when the registration cannot be
398    /// removed.
399    pub async fn remove_from_federation(&self, gateway_keypair: Keypair) {
400        // Removing gateway registrations is best effort, so just emit a warning if it
401        // fails
402        if let Err(e) = self.remove_from_federation_inner(gateway_keypair).await {
403            let gateway_id = gateway_keypair.public_key();
404            let federation_id = self
405                .client_ctx
406                .get_config()
407                .await
408                .global
409                .calculate_federation_id();
410            warn!("Failed to remove gateway {gateway_id} from federation {federation_id}: {e:?}");
411        }
412    }
413
414    /// Retrieves the signing challenge from each federation peer. Since each
415    /// peer maintains their own list of registered gateways, the gateway
416    /// needs to provide a signature that is signed by the private key of the
417    /// gateway id to remove the registration.
418    async fn remove_from_federation_inner(&self, gateway_keypair: Keypair) -> anyhow::Result<()> {
419        let gateway_id = gateway_keypair.public_key();
420        let challenges = self
421            .module_api
422            .get_remove_gateway_challenge(gateway_id)
423            .await;
424
425        let fed_public_key = self.cfg.threshold_pub_key;
426        let signatures = challenges
427            .into_iter()
428            .filter_map(|(peer_id, challenge)| {
429                let msg = create_gateway_remove_message(fed_public_key, peer_id, challenge?);
430                let signature = gateway_keypair.sign_schnorr(msg);
431                Some((peer_id, signature))
432            })
433            .collect::<BTreeMap<_, _>>();
434
435        let remove_gateway_request = RemoveGatewayRequest {
436            gateway_id,
437            signatures,
438        };
439
440        self.module_api.remove_gateway(remove_gateway_request).await;
441
442        Ok(())
443    }
444
445    /// Attempt fulfill HTLC by buying preimage from the federation
446    pub async fn gateway_handle_intercepted_htlc(&self, htlc: Htlc) -> anyhow::Result<OperationId> {
447        debug!("Handling intercepted HTLC {htlc:?}");
448        let (operation_id, amount, client_output, client_output_sm, contract_id) = self
449            .create_funding_incoming_contract_output_from_htlc(htlc.clone())
450            .await?;
451
452        let output = ClientOutput {
453            output: LightningOutput::V0(client_output.output),
454            amount,
455        };
456
457        let tx = TransactionBuilder::new().with_outputs(self.client_ctx.make_client_outputs(
458            ClientOutputBundle::new(vec![output], vec![client_output_sm]),
459        ));
460        let operation_meta_gen = |_: OutPointRange| GatewayMeta::Receive;
461        self.client_ctx
462            .finalize_and_submit_transaction(operation_id, KIND.as_str(), operation_meta_gen, tx)
463            .await?;
464        debug!(?operation_id, "Submitted transaction for HTLC {htlc:?}");
465        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
466        self.client_ctx
467            .log_event(
468                &mut dbtx,
469                IncomingPaymentStarted {
470                    contract_id,
471                    payment_hash: htlc.payment_hash,
472                    invoice_amount: htlc.outgoing_amount_msat,
473                    contract_amount: amount,
474                    operation_id,
475                },
476            )
477            .await;
478        dbtx.commit_tx().await;
479        Ok(operation_id)
480    }
481
482    /// Attempt buying preimage from this federation in order to fulfill a pay
483    /// request in another federation served by this gateway. In direct swap
484    /// scenario, the gateway DOES NOT send payment over the lightning network
485    async fn gateway_handle_direct_swap(
486        &self,
487        swap_params: SwapParameters,
488    ) -> anyhow::Result<OperationId> {
489        debug!("Handling direct swap {swap_params:?}");
490        let (operation_id, client_output, client_output_sm) = self
491            .create_funding_incoming_contract_output_from_swap(swap_params.clone())
492            .await?;
493
494        let output = ClientOutput {
495            output: LightningOutput::V0(client_output.output),
496            amount: client_output.amount,
497        };
498        let tx = TransactionBuilder::new().with_outputs(self.client_ctx.make_client_outputs(
499            ClientOutputBundle::new(vec![output], vec![client_output_sm]),
500        ));
501        let operation_meta_gen = |_: OutPointRange| GatewayMeta::Receive;
502        self.client_ctx
503            .finalize_and_submit_transaction(operation_id, KIND.as_str(), operation_meta_gen, tx)
504            .await?;
505        debug!(
506            ?operation_id,
507            "Submitted transaction for direct swap {swap_params:?}"
508        );
509        Ok(operation_id)
510    }
511
512    /// Subscribe to updates when the gateway is handling an intercepted HTLC,
513    /// or direct swap between federations
514    pub async fn gateway_subscribe_ln_receive(
515        &self,
516        operation_id: OperationId,
517    ) -> anyhow::Result<UpdateStreamOrOutcome<GatewayExtReceiveStates>> {
518        let operation = self.client_ctx.get_operation(operation_id).await?;
519        let mut stream = self.notifier.subscribe(operation_id).await;
520        let client_ctx = self.client_ctx.clone();
521
522        Ok(self.client_ctx.outcome_or_updates(&operation, operation_id, || {
523            stream! {
524
525                yield GatewayExtReceiveStates::Funding;
526
527                let state = loop {
528                    debug!("Getting next ln receive state for {}", operation_id.fmt_short());
529                    if let Some(GatewayClientStateMachines::Receive(state)) = stream.next().await {
530                        match state.state {
531                            IncomingSmStates::Preimage(preimage) =>{
532                                debug!(?operation_id, "Received preimage");
533                                break GatewayExtReceiveStates::Preimage(preimage)
534                            },
535                            IncomingSmStates::RefundSubmitted { out_points, error } => {
536                                debug!(?operation_id, "Refund submitted for {out_points:?} {error}");
537                                match client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await {
538                                    Ok(()) => {
539                                        debug!(?operation_id, "Refund success");
540                                        break GatewayExtReceiveStates::RefundSuccess { out_points, error }
541                                    },
542                                    Err(e) => {
543                                        warn!(?operation_id, "Got failure {e:?} while awaiting for refund outputs {out_points:?}");
544                                        break GatewayExtReceiveStates::RefundError{ error_message: e.to_string(), error }
545                                    },
546                                }
547                            },
548                            IncomingSmStates::FundingFailed { error } => {
549                                warn!(?operation_id, "Funding failed: {error:?}");
550                                break GatewayExtReceiveStates::FundingFailed{ error }
551                            },
552                            other => {
553                                debug!("Got state {other:?} while awaiting for output of {}", operation_id.fmt_short());
554                            }
555                        }
556                    }
557                };
558                yield state;
559            }
560        }))
561    }
562
563    /// For the given `OperationId`, this function will wait until the Complete
564    /// state machine has finished or failed.
565    pub async fn await_completion(&self, operation_id: OperationId) {
566        let mut stream = self.notifier.subscribe(operation_id).await;
567        loop {
568            match stream.next().await {
569                Some(GatewayClientStateMachines::Complete(state)) => match state.state {
570                    GatewayCompleteStates::HtlcFinished => {
571                        info!(%state, "LNv1 completion state machine finished");
572                        return;
573                    }
574                    GatewayCompleteStates::Failure => {
575                        error!(%state, "LNv1 completion state machine failed");
576                        return;
577                    }
578                    _ => {
579                        info!(%state, "Waiting for LNv1 completion state machine");
580                        continue;
581                    }
582                },
583                Some(GatewayClientStateMachines::Receive(state)) => {
584                    info!(%state, "Waiting for LNv1 completion state machine");
585                    continue;
586                }
587                Some(state) => {
588                    warn!(%state, "Operation is not an LNv1 completion state machine");
589                    return;
590                }
591                None => return,
592            }
593        }
594    }
595
596    /// Pay lightning invoice on behalf of federation user
597    pub async fn gateway_pay_bolt11_invoice(
598        &self,
599        pay_invoice_payload: PayInvoicePayload,
600    ) -> anyhow::Result<OperationId> {
601        let payload = pay_invoice_payload.clone();
602        let lightning_context = self.gateway.get_lightning_context().await?;
603
604        if matches!(
605            pay_invoice_payload.payment_data,
606            PaymentData::PrunedInvoice { .. }
607        ) {
608            ensure!(
609                lightning_context.lnrpc.supports_private_payments(),
610                "Private payments are not supported by the lightning node"
611            );
612        }
613
614        self.client_ctx.module_db()
615            .autocommit(
616                |dbtx, _| {
617                    Box::pin(async {
618                        let operation_id = OperationId(payload.contract_id.to_byte_array());
619
620                        self.client_ctx.log_event(dbtx, OutgoingPaymentStarted {
621                            contract_id: payload.contract_id,
622                            invoice_amount: payload.payment_data.amount().expect("LNv1 invoices should have an amount"),
623                            operation_id,
624                        }).await;
625
626                        let state_machines =
627                            vec![GatewayClientStateMachines::Pay(GatewayPayStateMachine {
628                                common: GatewayPayCommon { operation_id },
629                                state: GatewayPayStates::PayInvoice(GatewayPayInvoice {
630                                    pay_invoice_payload: payload.clone(),
631                                }),
632                            })];
633
634                        let dyn_states = state_machines
635                            .into_iter()
636                            .map(|s| self.client_ctx.make_dyn(s))
637                            .collect();
638
639                            match self.client_ctx.add_state_machines_dbtx(dbtx, dyn_states).await {
640                                Ok(()) => {
641                                    self.client_ctx
642                                        .add_operation_log_entry_dbtx(
643                                            dbtx,
644                                            operation_id,
645                                            KIND.as_str(),
646                                            GatewayMeta::Pay,
647                                        )
648                                        .await;
649                                }
650                                Err(AddStateMachinesError::StateAlreadyExists) => {
651                                    info!("State machine for operation {} already exists, will not add a new one", operation_id.fmt_short());
652                                }
653                                Err(other) => {
654                                    anyhow::bail!("Failed to add state machines: {other:?}")
655                                }
656                            }
657                            Ok(operation_id)
658                    })
659                },
660                Some(100),
661            )
662            .await
663            .map_err(|e| match e {
664                AutocommitError::ClosureError { error, .. } => error,
665                AutocommitError::CommitFailed { last_error, .. } => {
666                    anyhow::anyhow!("Commit to DB failed: {last_error}")
667                }
668            })
669    }
670
671    pub async fn gateway_subscribe_ln_pay(
672        &self,
673        operation_id: OperationId,
674    ) -> anyhow::Result<UpdateStreamOrOutcome<GatewayExtPayStates>> {
675        let mut stream = self.notifier.subscribe(operation_id).await;
676        let operation = self.client_ctx.get_operation(operation_id).await?;
677        let client_ctx = self.client_ctx.clone();
678
679        Ok(self.client_ctx.outcome_or_updates(&operation, operation_id, || {
680            stream! {
681                yield GatewayExtPayStates::Created;
682
683                loop {
684                    debug!("Getting next ln pay state for {}", operation_id.fmt_short());
685                    if let Some(GatewayClientStateMachines::Pay(state)) = stream.next().await {
686                        match state.state {
687                            GatewayPayStates::Preimage(out_points, preimage) => {
688                                yield GatewayExtPayStates::Preimage{ preimage: preimage.clone() };
689
690                                match client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await {
691                                    Ok(()) => {
692                                        debug!(?operation_id, "Success");
693                                        yield GatewayExtPayStates::Success{ preimage: preimage.clone(), out_points };
694                                        return;
695
696                                    }
697                                    Err(e) => {
698                                        warn!(?operation_id, "Got failure {e:?} while awaiting for outputs {out_points:?}");
699                                        // TODO: yield something here?
700                                    }
701                                }
702                            }
703                            GatewayPayStates::Canceled { txid, contract_id, error } => {
704                                debug!(?operation_id, "Trying to cancel contract {contract_id:?} due to {error:?}");
705                                match client_ctx.transaction_updates(operation_id).await.await_tx_accepted(txid).await {
706                                    Ok(()) => {
707                                        debug!(?operation_id, "Canceled contract {contract_id:?} due to {error:?}");
708                                        yield GatewayExtPayStates::Canceled{ error };
709                                        return;
710                                    }
711                                    Err(e) => {
712                                        warn!(?operation_id, "Got failure {e:?} while awaiting for transaction {txid} to be accepted for");
713                                        yield GatewayExtPayStates::Fail { error, error_message: format!("Refund transaction {txid} was not accepted by the federation. OperationId: {} Error: {e:?}", operation_id.fmt_short()) };
714                                    }
715                                }
716                            }
717                            GatewayPayStates::OfferDoesNotExist(contract_id) => {
718                                warn!("Yielding OfferDoesNotExist state for {} and contract {contract_id}", operation_id.fmt_short());
719                                yield GatewayExtPayStates::OfferDoesNotExist { contract_id };
720                            }
721                            GatewayPayStates::Failed{ error, error_message } => {
722                                warn!("Yielding Fail state for {} due to {error:?} {error_message:?}", operation_id.fmt_short());
723                                yield GatewayExtPayStates::Fail{ error, error_message };
724                            },
725                            GatewayPayStates::PayInvoice(_) => {
726                                debug!("Got initial state PayInvoice while awaiting for output of {}", operation_id.fmt_short());
727                            }
728                            other => {
729                                info!("Got state {other:?} while awaiting for output of {}", operation_id.fmt_short());
730                            }
731                        }
732                    } else {
733                        warn!("Got None while getting next ln pay state for {}", operation_id.fmt_short());
734                    }
735                }
736            }
737        }))
738    }
739}
740
741#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
742pub enum GatewayClientStateMachines {
743    Pay(GatewayPayStateMachine),
744    Receive(IncomingStateMachine),
745    Complete(GatewayCompleteStateMachine),
746}
747
748impl fmt::Display for GatewayClientStateMachines {
749    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750        match self {
751            GatewayClientStateMachines::Pay(pay) => {
752                write!(f, "{pay}")
753            }
754            GatewayClientStateMachines::Receive(receive) => {
755                write!(f, "{receive}")
756            }
757            GatewayClientStateMachines::Complete(complete) => {
758                write!(f, "{complete}")
759            }
760        }
761    }
762}
763
764impl IntoDynInstance for GatewayClientStateMachines {
765    type DynType = DynState;
766
767    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
768        DynState::from_typed(instance_id, self)
769    }
770}
771
772impl State for GatewayClientStateMachines {
773    type ModuleContext = GatewayClientContext;
774
775    fn transitions(
776        &self,
777        context: &Self::ModuleContext,
778        global_context: &DynGlobalClientContext,
779    ) -> Vec<fedimint_client::sm::StateTransition<Self>> {
780        match self {
781            GatewayClientStateMachines::Pay(pay_state) => {
782                sm_enum_variant_translation!(
783                    pay_state.transitions(context, global_context),
784                    GatewayClientStateMachines::Pay
785                )
786            }
787            GatewayClientStateMachines::Receive(receive_state) => {
788                sm_enum_variant_translation!(
789                    receive_state.transitions(&context.into(), global_context),
790                    GatewayClientStateMachines::Receive
791                )
792            }
793            GatewayClientStateMachines::Complete(complete_state) => {
794                sm_enum_variant_translation!(
795                    complete_state.transitions(context, global_context),
796                    GatewayClientStateMachines::Complete
797                )
798            }
799        }
800    }
801
802    fn operation_id(&self) -> fedimint_core::core::OperationId {
803        match self {
804            GatewayClientStateMachines::Pay(pay_state) => pay_state.operation_id(),
805            GatewayClientStateMachines::Receive(receive_state) => receive_state.operation_id(),
806            GatewayClientStateMachines::Complete(complete_state) => complete_state.operation_id(),
807        }
808    }
809}
810
811#[derive(Debug, Clone, Eq, PartialEq)]
812pub struct Htlc {
813    /// The HTLC payment hash.
814    pub payment_hash: sha256::Hash,
815    /// The incoming HTLC amount in millisatoshi.
816    pub incoming_amount_msat: Amount,
817    /// The outgoing HTLC amount in millisatoshi
818    pub outgoing_amount_msat: Amount,
819    /// The incoming HTLC expiry
820    pub incoming_expiry: u32,
821    /// The short channel id of the HTLC.
822    pub short_channel_id: Option<u64>,
823    /// The id of the incoming channel
824    pub incoming_chan_id: u64,
825    /// The index of the incoming htlc in the incoming channel
826    pub htlc_id: u64,
827}
828
829impl TryFrom<InterceptPaymentRequest> for Htlc {
830    type Error = anyhow::Error;
831
832    fn try_from(s: InterceptPaymentRequest) -> Result<Self, Self::Error> {
833        Ok(Self {
834            payment_hash: s.payment_hash,
835            incoming_amount_msat: Amount::from_msats(s.amount_msat),
836            outgoing_amount_msat: Amount::from_msats(s.amount_msat),
837            incoming_expiry: s.expiry,
838            short_channel_id: s.short_channel_id,
839            incoming_chan_id: s.incoming_chan_id,
840            htlc_id: s.htlc_id,
841        })
842    }
843}
844
845#[derive(Debug, Clone)]
846struct SwapParameters {
847    payment_hash: sha256::Hash,
848    amount_msat: Amount,
849}
850
851impl TryFrom<PaymentData> for SwapParameters {
852    type Error = anyhow::Error;
853
854    fn try_from(s: PaymentData) -> Result<Self, Self::Error> {
855        let payment_hash = s.payment_hash();
856        let amount_msat = s
857            .amount()
858            .ok_or_else(|| anyhow::anyhow!("Amountless invoice cannot be used in direct swap"))?;
859        Ok(Self {
860            payment_hash,
861            amount_msat,
862        })
863    }
864}