ln_gateway/lightning/
ldk.rs

1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bitcoin::hashes::Hash;
8use bitcoin::{Network, OutPoint};
9use fedimint_bip39::Mnemonic;
10use fedimint_core::envs::is_env_var_set;
11use fedimint_core::task::{block_in_place, TaskGroup, TaskHandle};
12use fedimint_core::{Amount, BitcoinAmountOrAll};
13use fedimint_ln_common::contracts::Preimage;
14use ldk_node::config::EsploraSyncConfig;
15use ldk_node::lightning::ln::msgs::SocketAddress;
16use ldk_node::lightning::ln::PaymentHash;
17use ldk_node::lightning::routing::gossip::NodeAlias;
18use ldk_node::payment::{PaymentKind, PaymentStatus, SendingParameters};
19use lightning::ln::channelmanager::PaymentId;
20use lightning::ln::PaymentPreimage;
21use lightning::util::scid_utils::scid_from_parts;
22use lightning_invoice::Bolt11Invoice;
23use tokio::sync::mpsc::Sender;
24use tokio_stream::wrappers::ReceiverStream;
25use tracing::{error, info};
26
27use super::{ChannelInfo, ILnRpcClient, LightningRpcError, RouteHtlcStream};
28use crate::lightning::{
29    CloseChannelsWithPeerResponse, CreateInvoiceRequest, CreateInvoiceResponse,
30    GetBalancesResponse, GetLnOnchainAddressResponse, GetNodeInfoResponse, GetRouteHintsResponse,
31    InterceptPaymentRequest, InterceptPaymentResponse, InvoiceDescription, OpenChannelResponse,
32    PayInvoiceResponse, PaymentAction, SendOnchainResponse,
33};
34use crate::rpc::{CloseChannelsWithPeerPayload, OpenChannelPayload, SendOnchainPayload};
35
36pub struct GatewayLdkClient {
37    /// The underlying lightning node.
38    node: Arc<ldk_node::Node>,
39
40    /// The client for querying data about the blockchain.
41    esplora_client: esplora_client::AsyncClient,
42
43    task_group: TaskGroup,
44
45    /// The HTLC stream, until it is taken by calling
46    /// `ILnRpcClient::route_htlcs`.
47    htlc_stream_receiver_or: Option<tokio::sync::mpsc::Receiver<InterceptPaymentRequest>>,
48
49    /// Lock pool used to ensure that our implementation of `ILnRpcClient::pay`
50    /// doesn't allow for multiple simultaneous calls with the same invoice to
51    /// execute in parallel. This helps ensure that the function is idempotent.
52    outbound_lightning_payment_lock_pool: lockable::LockPool<PaymentId>,
53}
54
55impl std::fmt::Debug for GatewayLdkClient {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("GatewayLdkClient").finish_non_exhaustive()
58    }
59}
60
61impl GatewayLdkClient {
62    /// Creates a new `GatewayLdkClient` instance and starts the underlying
63    /// lightning node. All resources, including the lightning node, will be
64    /// cleaned up when the returned `GatewayLdkClient` instance is dropped.
65    /// There's no need to manually stop the node.
66    pub fn new(
67        data_dir: &Path,
68        esplora_server_url: &str,
69        network: Network,
70        lightning_port: u16,
71        mnemonic: Mnemonic,
72    ) -> anyhow::Result<Self> {
73        // In devimint, gateways must allow for other gateways to open channels to them.
74        // To ensure this works, we must set a node alias to signal to ldk-node that we
75        // should accept incoming public channels. However, on mainnet we can disable
76        // this for better privacy.
77        let node_alias = if network == Network::Bitcoin {
78            None
79        } else {
80            let alias = format!("{network} LDK Gateway");
81            let mut bytes = [0u8; 32];
82            bytes[..alias.as_bytes().len()].copy_from_slice(alias.as_bytes());
83            Some(NodeAlias(bytes))
84        };
85
86        let mut node_builder = ldk_node::Builder::from_config(ldk_node::config::Config {
87            network,
88            listening_addresses: Some(vec![SocketAddress::TcpIpV4 {
89                addr: [0, 0, 0, 0],
90                port: lightning_port,
91            }]),
92            node_alias,
93            ..Default::default()
94        });
95        node_builder
96            .set_entropy_bip39_mnemonic(mnemonic, None)
97            .set_chain_source_esplora(
98                esplora_server_url.to_string(),
99                Some(EsploraSyncConfig {
100                    // TODO: Remove these and rely on the default values.
101                    // See here for details: https://github.com/lightningdevkit/ldk-node/issues/339#issuecomment-2344230472
102                    onchain_wallet_sync_interval_secs: 10,
103                    lightning_wallet_sync_interval_secs: 10,
104                    ..Default::default()
105                }),
106            );
107        let Some(data_dir_str) = data_dir.to_str() else {
108            return Err(anyhow::anyhow!("Invalid data dir path"));
109        };
110        node_builder.set_storage_dir_path(data_dir_str.to_string());
111
112        let node = Arc::new(node_builder.build()?);
113        // TODO: Call `start_with_runtime()` instead of `start()`.
114        // See https://github.com/fedimint/fedimint/issues/6159
115        node.start().map_err(|e| {
116            error!(?e, "Failed to start LDK Node");
117            LightningRpcError::FailedToConnect
118        })?;
119
120        let (htlc_stream_sender, htlc_stream_receiver) = tokio::sync::mpsc::channel(1024);
121        let task_group = TaskGroup::new();
122
123        let node_clone = node.clone();
124        task_group.spawn("ldk lightning node event handler", |handle| async move {
125            loop {
126                Self::handle_next_event(&node_clone, &htlc_stream_sender, &handle).await;
127            }
128        });
129
130        Ok(GatewayLdkClient {
131            node,
132            esplora_client: esplora_client::Builder::new(esplora_server_url).build_async()?,
133            task_group,
134            htlc_stream_receiver_or: Some(htlc_stream_receiver),
135            outbound_lightning_payment_lock_pool: lockable::LockPool::new(),
136        })
137    }
138
139    async fn handle_next_event(
140        node: &ldk_node::Node,
141        htlc_stream_sender: &Sender<InterceptPaymentRequest>,
142        handle: &TaskHandle,
143    ) {
144        // We manually check for task termination in case we receive a payment while the
145        // task is shutting down. In that case, we want to finish the payment
146        // before shutting this task down.
147        let event = tokio::select! {
148            event = node.next_event_async() => {
149                event
150            }
151            () = handle.make_shutdown_rx() => {
152                return;
153            }
154        };
155
156        if let ldk_node::Event::PaymentClaimable {
157            payment_id: _,
158            payment_hash,
159            claimable_amount_msat,
160            claim_deadline,
161        } = event
162        {
163            if let Err(e) = htlc_stream_sender
164                .send(InterceptPaymentRequest {
165                    payment_hash: Hash::from_slice(&payment_hash.0).expect("Failed to create Hash"),
166                    amount_msat: claimable_amount_msat,
167                    expiry: claim_deadline.unwrap_or_default(),
168                    short_channel_id: None,
169                    incoming_chan_id: 0,
170                    htlc_id: 0,
171                })
172                .await
173            {
174                error!(?e, "Failed send InterceptHtlcRequest to stream");
175            }
176        }
177
178        // The `PaymentClaimable` event is the only event type that we are interested
179        // in. We can safely ignore all other events.
180        node.event_handled();
181    }
182
183    /// Converts a transaction outpoint to a short channel ID by querying the
184    /// blockchain.
185    async fn outpoint_to_scid(&self, funding_txo: OutPoint) -> anyhow::Result<u64> {
186        let block_height = self
187            .esplora_client
188            .get_merkle_proof(&funding_txo.txid)
189            .await?
190            .ok_or(anyhow::anyhow!("Failed to get merkle proof"))?
191            .block_height;
192
193        let block_hash = self.esplora_client.get_block_hash(block_height).await?;
194
195        let block = self
196            .esplora_client
197            .get_block_by_hash(&block_hash)
198            .await?
199            .ok_or(anyhow::anyhow!("Failed to get block"))?;
200
201        let tx_index = block
202            .txdata
203            .iter()
204            .enumerate()
205            .find(|(_, tx)| tx.compute_txid() == funding_txo.txid)
206            .ok_or(anyhow::anyhow!("Failed to find transaction"))?
207            .0 as u32;
208
209        let output_index = funding_txo.vout;
210
211        scid_from_parts(
212            u64::from(block_height),
213            u64::from(tx_index),
214            u64::from(output_index),
215        )
216        .map_err(|e| anyhow::anyhow!("Failed to convert to short channel ID: {e:?}"))
217    }
218}
219
220impl Drop for GatewayLdkClient {
221    fn drop(&mut self) {
222        self.task_group.shutdown();
223
224        info!("Stopping LDK Node...");
225        if let Err(e) = self.node.stop() {
226            error!(?e, "Failed to stop LDK Node");
227        } else {
228            info!("LDK Node stopped.");
229        }
230    }
231}
232
233#[async_trait]
234impl ILnRpcClient for GatewayLdkClient {
235    async fn info(&self) -> Result<GetNodeInfoResponse, LightningRpcError> {
236        // HACK: https://github.com/lightningdevkit/ldk-node/issues/339 when running in devimint
237        // to speed up tests
238        if is_env_var_set("FM_IN_DEVIMINT") {
239            block_in_place(|| {
240                let _ = self.node.sync_wallets();
241            });
242        }
243        let node_status = self.node.status();
244
245        let Some(esplora_chain_tip_block_summary) = self
246            .esplora_client
247            .get_blocks(None)
248            .await
249            .map_err(|e| LightningRpcError::FailedToGetNodeInfo {
250                failure_reason: format!("Failed get chain tip block summary: {e:?}"),
251            })?
252            .into_iter()
253            .next()
254        else {
255            return Err(LightningRpcError::FailedToGetNodeInfo {
256                failure_reason:
257                    "Failed to get chain tip block summary (empty block list was returned)"
258                        .to_string(),
259            });
260        };
261
262        let esplora_chain_tip_block_height = esplora_chain_tip_block_summary.time.height;
263        let ldk_block_height: u32 = node_status.current_best_block.height;
264        let synced_to_chain = esplora_chain_tip_block_height == ldk_block_height;
265
266        assert!(
267            esplora_chain_tip_block_height >= ldk_block_height,
268            "LDK Block Height is in the future"
269        );
270
271        Ok(GetNodeInfoResponse {
272            pub_key: self.node.node_id(),
273            alias: match self.node.node_alias() {
274                Some(alias) => alias.to_string(),
275                None => format!("LDK Fedimint Gateway Node {}", self.node.node_id()),
276            },
277            network: self.node.config().network.to_string(),
278            block_height: ldk_block_height,
279            synced_to_chain,
280        })
281    }
282
283    async fn routehints(
284        &self,
285        _num_route_hints: usize,
286    ) -> Result<GetRouteHintsResponse, LightningRpcError> {
287        // TODO: Return real route hints. Not strictly necessary but would be nice to
288        // have.
289        Ok(GetRouteHintsResponse {
290            route_hints: vec![],
291        })
292    }
293
294    async fn pay(
295        &self,
296        invoice: Bolt11Invoice,
297        max_delay: u64,
298        max_fee: Amount,
299    ) -> Result<PayInvoiceResponse, LightningRpcError> {
300        let payment_id = PaymentId(*invoice.payment_hash().as_byte_array());
301
302        // Lock by the payment hash to prevent multiple simultaneous calls with the same
303        // invoice from executing. This prevents `ldk-node::Bolt11Payment::send()` from
304        // being called multiple times with the same invoice. This is important because
305        // `ldk-node::Bolt11Payment::send()` is not idempotent, but this function must
306        // be idempotent.
307        let _payment_lock_guard = self
308            .outbound_lightning_payment_lock_pool
309            .async_lock(payment_id)
310            .await;
311
312        // If a payment is not known to the node we can initiate it, and if it is known
313        // we can skip calling `ldk-node::Bolt11Payment::send()` and wait for the
314        // payment to complete. The lock guard above guarantees that this block is only
315        // executed once at a time for a given payment hash, ensuring that there is no
316        // race condition between checking if a payment is known and initiating a new
317        // payment if it isn't.
318        if self.node.payment(&payment_id).is_none() {
319            assert_eq!(
320                self.node
321                    .bolt11_payment()
322                    .send(
323                        &invoice,
324                        Some(SendingParameters {
325                            max_total_routing_fee_msat: Some(Some(max_fee.msats)),
326                            max_total_cltv_expiry_delta: Some(max_delay as u32),
327                            max_path_count: None,
328                            max_channel_saturation_power_of_half: None,
329                        }),
330                    )
331                    // TODO: Investigate whether all error types returned by `Bolt11Payment::send()`
332                    // result in idempotency.
333                    .map_err(|e| LightningRpcError::FailedPayment {
334                        failure_reason: format!("LDK payment failed to initialize: {e:?}"),
335                    })?,
336                payment_id
337            );
338        }
339
340        // TODO: Find a way to avoid looping/polling to know when a payment is
341        // completed. `ldk-node` provides `PaymentSuccessful` and `PaymentFailed`
342        // events, but interacting with the node event queue here isn't
343        // straightforward.
344        loop {
345            if let Some(payment_details) = self.node.payment(&payment_id) {
346                match payment_details.status {
347                    PaymentStatus::Pending => {}
348                    PaymentStatus::Succeeded => {
349                        if let PaymentKind::Bolt11 {
350                            preimage: Some(preimage),
351                            ..
352                        } = payment_details.kind
353                        {
354                            return Ok(PayInvoiceResponse {
355                                preimage: Preimage(preimage.0),
356                            });
357                        }
358                    }
359                    PaymentStatus::Failed => {
360                        return Err(LightningRpcError::FailedPayment {
361                            failure_reason: "LDK payment failed".to_string(),
362                        });
363                    }
364                }
365            }
366            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
367        }
368    }
369
370    async fn route_htlcs<'a>(
371        mut self: Box<Self>,
372        _task_group: &TaskGroup,
373    ) -> Result<(RouteHtlcStream<'a>, Arc<dyn ILnRpcClient>), LightningRpcError> {
374        let route_htlc_stream = match self.htlc_stream_receiver_or.take() {
375            Some(stream) => Ok(Box::pin(ReceiverStream::new(stream))),
376            None => Err(LightningRpcError::FailedToRouteHtlcs {
377                failure_reason:
378                    "Stream does not exist. Likely was already taken by calling `route_htlcs()`."
379                        .to_string(),
380            }),
381        }?;
382
383        Ok((route_htlc_stream, Arc::new(*self)))
384    }
385
386    async fn complete_htlc(&self, htlc: InterceptPaymentResponse) -> Result<(), LightningRpcError> {
387        let InterceptPaymentResponse {
388            action,
389            payment_hash,
390            incoming_chan_id: _,
391            htlc_id: _,
392        } = htlc;
393
394        let ph = PaymentHash(*payment_hash.clone().as_byte_array());
395
396        // TODO: Get the actual amount from the LDK node. Probably makes the
397        // most sense to pipe it through the `InterceptHtlcResponse` struct.
398        // This value is only used by `ldk-node` to ensure that the amount
399        // claimed isn't less than the amount expected, but we've already
400        // verified that the amount is correct when we intercepted the payment.
401        let claimable_amount_msat = 999_999_999_999_999;
402
403        let ph_hex_str = hex::encode(payment_hash);
404
405        if let PaymentAction::Settle(preimage) = action {
406            self.node
407                .bolt11_payment()
408                .claim_for_hash(ph, claimable_amount_msat, PaymentPreimage(preimage.0))
409                .map_err(|_| LightningRpcError::FailedToCompleteHtlc {
410                    failure_reason: format!("Failed to claim LDK payment with hash {ph_hex_str}"),
411                })?;
412        } else {
413            error!("Unwinding payment with hash {ph_hex_str} because the action was not `Settle`");
414            self.node.bolt11_payment().fail_for_hash(ph).map_err(|_| {
415                LightningRpcError::FailedToCompleteHtlc {
416                    failure_reason: format!("Failed to unwind LDK payment with hash {ph_hex_str}"),
417                }
418            })?;
419        }
420
421        return Ok(());
422    }
423
424    async fn create_invoice(
425        &self,
426        create_invoice_request: CreateInvoiceRequest,
427    ) -> Result<CreateInvoiceResponse, LightningRpcError> {
428        let payment_hash_or = if let Some(payment_hash) = create_invoice_request.payment_hash {
429            let ph = PaymentHash(*payment_hash.as_byte_array());
430            Some(ph)
431        } else {
432            None
433        };
434
435        // Currently `ldk-node` only supports direct descriptions.
436        // See https://github.com/lightningdevkit/ldk-node/issues/325.
437        // TODO: Once the above issue is resolved, we should support
438        // description hashes as well.
439        let description_str = match create_invoice_request.description {
440            Some(InvoiceDescription::Direct(desc)) => desc,
441            _ => String::new(),
442        };
443
444        let invoice = match payment_hash_or {
445            Some(payment_hash) => self.node.bolt11_payment().receive_for_hash(
446                create_invoice_request.amount_msat,
447                description_str.as_str(),
448                create_invoice_request.expiry_secs,
449                payment_hash,
450            ),
451            None => self.node.bolt11_payment().receive(
452                create_invoice_request.amount_msat,
453                description_str.as_str(),
454                create_invoice_request.expiry_secs,
455            ),
456        }
457        .map_err(|e| LightningRpcError::FailedToGetInvoice {
458            failure_reason: e.to_string(),
459        })?;
460
461        Ok(CreateInvoiceResponse {
462            invoice: invoice.to_string(),
463        })
464    }
465
466    async fn get_ln_onchain_address(
467        &self,
468    ) -> Result<GetLnOnchainAddressResponse, LightningRpcError> {
469        self.node
470            .onchain_payment()
471            .new_address()
472            .map(|address| GetLnOnchainAddressResponse {
473                address: address.to_string(),
474            })
475            .map_err(|e| LightningRpcError::FailedToGetLnOnchainAddress {
476                failure_reason: e.to_string(),
477            })
478    }
479
480    async fn send_onchain(
481        &self,
482        SendOnchainPayload {
483            address,
484            amount,
485            // TODO: Respect this fee rate once `ldk-node` supports setting a custom fee rate.
486            // This work is planned to be in `ldk-node` v0.4 and is tracked here:
487            // https://github.com/lightningdevkit/ldk-node/issues/176
488            fee_rate_sats_per_vbyte: _,
489        }: SendOnchainPayload,
490    ) -> Result<SendOnchainResponse, LightningRpcError> {
491        let onchain = self.node.onchain_payment();
492
493        let txid = match amount {
494            BitcoinAmountOrAll::All => onchain.send_all_to_address(&address.assume_checked()),
495            BitcoinAmountOrAll::Amount(amount_sats) => {
496                onchain.send_to_address(&address.assume_checked(), amount_sats.to_sat())
497            }
498        }
499        .map_err(|e| LightningRpcError::FailedToWithdrawOnchain {
500            failure_reason: e.to_string(),
501        })?;
502
503        Ok(SendOnchainResponse {
504            txid: txid.to_string(),
505        })
506    }
507
508    async fn open_channel(
509        &self,
510        OpenChannelPayload {
511            pubkey,
512            host,
513            channel_size_sats,
514            push_amount_sats,
515        }: OpenChannelPayload,
516    ) -> Result<OpenChannelResponse, LightningRpcError> {
517        let push_amount_msats_or = if push_amount_sats == 0 {
518            None
519        } else {
520            Some(push_amount_sats * 1000)
521        };
522
523        let user_channel_id = self
524            .node
525            .open_announced_channel(
526                pubkey,
527                SocketAddress::from_str(&host).map_err(|e| {
528                    LightningRpcError::FailedToConnectToPeer {
529                        failure_reason: e.to_string(),
530                    }
531                })?,
532                channel_size_sats,
533                push_amount_msats_or,
534                None,
535            )
536            .map_err(|e| LightningRpcError::FailedToOpenChannel {
537                failure_reason: e.to_string(),
538            })?;
539
540        // The channel isn't always visible immediately, so we need to poll for it.
541        for _ in 0..10 {
542            let funding_txid_or = self
543                .node
544                .list_channels()
545                .iter()
546                .find(|channel| channel.user_channel_id == user_channel_id)
547                .and_then(|channel| channel.funding_txo)
548                .map(|funding_txo| funding_txo.txid);
549
550            if let Some(funding_txid) = funding_txid_or {
551                return Ok(OpenChannelResponse {
552                    funding_txid: funding_txid.to_string(),
553                });
554            }
555
556            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
557        }
558
559        Err(LightningRpcError::FailedToOpenChannel {
560            failure_reason: "Channel could not be opened".to_string(),
561        })
562    }
563
564    async fn close_channels_with_peer(
565        &self,
566        CloseChannelsWithPeerPayload { pubkey }: CloseChannelsWithPeerPayload,
567    ) -> Result<CloseChannelsWithPeerResponse, LightningRpcError> {
568        let mut num_channels_closed = 0;
569
570        for channel_with_peer in self
571            .node
572            .list_channels()
573            .iter()
574            .filter(|channel| channel.counterparty_node_id == pubkey)
575        {
576            if self
577                .node
578                .close_channel(&channel_with_peer.user_channel_id, pubkey)
579                .is_ok()
580            {
581                num_channels_closed += 1;
582            }
583        }
584
585        Ok(CloseChannelsWithPeerResponse {
586            num_channels_closed,
587        })
588    }
589
590    async fn list_active_channels(&self) -> Result<Vec<ChannelInfo>, LightningRpcError> {
591        let mut channels = Vec::new();
592
593        for channel_details in self
594            .node
595            .list_channels()
596            .iter()
597            .filter(|channel| channel.is_usable)
598        {
599            channels.push(ChannelInfo {
600                remote_pubkey: channel_details.counterparty_node_id,
601                channel_size_sats: channel_details.channel_value_sats,
602                outbound_liquidity_sats: channel_details.outbound_capacity_msat / 1000,
603                inbound_liquidity_sats: channel_details.inbound_capacity_msat / 1000,
604                short_channel_id: match channel_details.funding_txo {
605                    Some(funding_txo) => self.outpoint_to_scid(funding_txo).await.unwrap_or(0),
606                    None => 0,
607                },
608            });
609        }
610
611        Ok(channels)
612    }
613
614    async fn get_balances(&self) -> Result<GetBalancesResponse, LightningRpcError> {
615        let balances = self.node.list_balances();
616        let channel_lists = self
617            .node
618            .list_channels()
619            .into_iter()
620            .filter(|chan| chan.is_usable)
621            .collect::<Vec<_>>();
622        // map and get the total inbound_capacity_msat in the channels
623        let total_inbound_liquidity_balance_msat: u64 = channel_lists
624            .iter()
625            .map(|channel| channel.inbound_capacity_msat)
626            .sum();
627
628        Ok(GetBalancesResponse {
629            onchain_balance_sats: balances.total_onchain_balance_sats,
630            lightning_balance_msats: balances.total_lightning_balance_sats * 1000,
631            inbound_lightning_liquidity_msats: total_inbound_liquidity_balance_msat,
632        })
633    }
634}