fedimint_lightning/
ldk.rs

1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bitcoin::hashes::Hash;
8use bitcoin::{Network, OutPoint};
9use fedimint_bip39::Mnemonic;
10use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
11use fedimint_core::envs::{is_env_var_set, BitcoinRpcConfig};
12use fedimint_core::task::{block_in_place, TaskGroup, TaskHandle};
13use fedimint_core::util::SafeUrl;
14use fedimint_core::{Amount, BitcoinAmountOrAll};
15use fedimint_ln_common::contracts::Preimage;
16use ldk_node::lightning::ln::msgs::SocketAddress;
17use ldk_node::lightning::ln::PaymentHash;
18use ldk_node::lightning::routing::gossip::NodeAlias;
19use ldk_node::payment::{PaymentKind, PaymentStatus, SendingParameters};
20use lightning::ln::channelmanager::PaymentId;
21use lightning::ln::PaymentPreimage;
22use lightning::util::scid_utils::scid_from_parts;
23use lightning_invoice::Bolt11Invoice;
24use tokio::sync::mpsc::Sender;
25use tokio_stream::wrappers::ReceiverStream;
26use tracing::{error, info};
27
28use super::{
29    ChannelInfo, ILnRpcClient, LightningRpcError, ListActiveChannelsResponse, RouteHtlcStream,
30};
31use crate::{
32    CloseChannelsWithPeerRequest, CloseChannelsWithPeerResponse, CreateInvoiceRequest,
33    CreateInvoiceResponse, GetBalancesResponse, GetLnOnchainAddressResponse, GetNodeInfoResponse,
34    GetRouteHintsResponse, InterceptPaymentRequest, InterceptPaymentResponse, InvoiceDescription,
35    OpenChannelRequest, OpenChannelResponse, PayInvoiceResponse, PaymentAction, SendOnchainRequest,
36    SendOnchainResponse,
37};
38
39pub enum GatewayLdkChainSourceConfig {
40    Bitcoind { server_url: SafeUrl },
41    Esplora { server_url: SafeUrl },
42}
43
44impl GatewayLdkChainSourceConfig {
45    fn bitcoin_rpc_config(&self) -> BitcoinRpcConfig {
46        match self {
47            Self::Bitcoind { server_url } => BitcoinRpcConfig {
48                kind: "bitcoind".to_string(),
49                url: server_url.clone(),
50            },
51            Self::Esplora { server_url } => BitcoinRpcConfig {
52                kind: "esplora".to_string(),
53                url: server_url.clone(),
54            },
55        }
56    }
57}
58
59pub struct GatewayLdkClient {
60    /// The underlying lightning node.
61    node: Arc<ldk_node::Node>,
62
63    /// The client for querying data about the blockchain.
64    bitcoind_rpc: DynBitcoindRpc,
65
66    task_group: TaskGroup,
67
68    /// The HTLC stream, until it is taken by calling
69    /// `ILnRpcClient::route_htlcs`.
70    htlc_stream_receiver_or: Option<tokio::sync::mpsc::Receiver<InterceptPaymentRequest>>,
71
72    /// Lock pool used to ensure that our implementation of `ILnRpcClient::pay`
73    /// doesn't allow for multiple simultaneous calls with the same invoice to
74    /// execute in parallel. This helps ensure that the function is idempotent.
75    outbound_lightning_payment_lock_pool: lockable::LockPool<PaymentId>,
76}
77
78impl std::fmt::Debug for GatewayLdkClient {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("GatewayLdkClient").finish_non_exhaustive()
81    }
82}
83
84impl GatewayLdkClient {
85    /// Creates a new `GatewayLdkClient` instance and starts the underlying
86    /// lightning node. All resources, including the lightning node, will be
87    /// cleaned up when the returned `GatewayLdkClient` instance is dropped.
88    /// There's no need to manually stop the node.
89    pub fn new(
90        data_dir: &Path,
91        chain_source_config: GatewayLdkChainSourceConfig,
92        network: Network,
93        lightning_port: u16,
94        mnemonic: Mnemonic,
95        runtime: Arc<tokio::runtime::Runtime>,
96    ) -> anyhow::Result<Self> {
97        // In devimint, gateways must allow for other gateways to open channels to them.
98        // To ensure this works, we must set a node alias to signal to ldk-node that we
99        // should accept incoming public channels. However, on mainnet we can disable
100        // this for better privacy.
101        let node_alias = if network == Network::Bitcoin {
102            None
103        } else {
104            let alias = format!("{network} LDK Gateway");
105            let mut bytes = [0u8; 32];
106            bytes[..alias.as_bytes().len()].copy_from_slice(alias.as_bytes());
107            Some(NodeAlias(bytes))
108        };
109
110        let mut node_builder = ldk_node::Builder::from_config(ldk_node::config::Config {
111            network,
112            listening_addresses: Some(vec![SocketAddress::TcpIpV4 {
113                addr: [0, 0, 0, 0],
114                port: lightning_port,
115            }]),
116            node_alias,
117            ..Default::default()
118        });
119
120        node_builder.set_entropy_bip39_mnemonic(mnemonic, None);
121
122        let bitcoind_rpc = create_bitcoind(&chain_source_config.bitcoin_rpc_config())?;
123
124        match chain_source_config {
125            GatewayLdkChainSourceConfig::Bitcoind { server_url } => {
126                node_builder.set_chain_source_bitcoind_rpc(
127                    server_url
128                        .host_str()
129                        .expect("Could not retrieve host from bitcoind RPC url")
130                        .to_string(),
131                    server_url
132                        .port()
133                        .expect("Could not retrieve port from bitcoind RPC url"),
134                    server_url.username().to_string(),
135                    server_url.password().unwrap_or_default().to_string(),
136                );
137            }
138            GatewayLdkChainSourceConfig::Esplora { server_url } => {
139                node_builder.set_chain_source_esplora(server_url.to_string(), None);
140            }
141        };
142        let Some(data_dir_str) = data_dir.to_str() else {
143            return Err(anyhow::anyhow!("Invalid data dir path"));
144        };
145        node_builder.set_storage_dir_path(data_dir_str.to_string());
146
147        let node = Arc::new(node_builder.build()?);
148        node.start_with_runtime(runtime).map_err(|e| {
149            error!(?e, "Failed to start LDK Node");
150            LightningRpcError::FailedToConnect
151        })?;
152
153        let (htlc_stream_sender, htlc_stream_receiver) = tokio::sync::mpsc::channel(1024);
154        let task_group = TaskGroup::new();
155
156        let node_clone = node.clone();
157        task_group.spawn("ldk lightning node event handler", |handle| async move {
158            loop {
159                Self::handle_next_event(&node_clone, &htlc_stream_sender, &handle).await;
160            }
161        });
162
163        Ok(GatewayLdkClient {
164            node,
165            bitcoind_rpc,
166            task_group,
167            htlc_stream_receiver_or: Some(htlc_stream_receiver),
168            outbound_lightning_payment_lock_pool: lockable::LockPool::new(),
169        })
170    }
171
172    async fn handle_next_event(
173        node: &ldk_node::Node,
174        htlc_stream_sender: &Sender<InterceptPaymentRequest>,
175        handle: &TaskHandle,
176    ) {
177        // We manually check for task termination in case we receive a payment while the
178        // task is shutting down. In that case, we want to finish the payment
179        // before shutting this task down.
180        let event = tokio::select! {
181            event = node.next_event_async() => {
182                event
183            }
184            () = handle.make_shutdown_rx() => {
185                return;
186            }
187        };
188
189        if let ldk_node::Event::PaymentClaimable {
190            payment_id: _,
191            payment_hash,
192            claimable_amount_msat,
193            claim_deadline,
194        } = event
195        {
196            if let Err(e) = htlc_stream_sender
197                .send(InterceptPaymentRequest {
198                    payment_hash: Hash::from_slice(&payment_hash.0).expect("Failed to create Hash"),
199                    amount_msat: claimable_amount_msat,
200                    expiry: claim_deadline.unwrap_or_default(),
201                    short_channel_id: None,
202                    incoming_chan_id: 0,
203                    htlc_id: 0,
204                })
205                .await
206            {
207                error!(?e, "Failed send InterceptHtlcRequest to stream");
208            }
209        }
210
211        // The `PaymentClaimable` event is the only event type that we are interested
212        // in. We can safely ignore all other events.
213        node.event_handled();
214    }
215
216    /// Converts a transaction outpoint to a short channel ID by querying the
217    /// blockchain.
218    async fn outpoint_to_scid(&self, funding_txo: OutPoint) -> anyhow::Result<u64> {
219        let block_hash = self
220            .bitcoind_rpc
221            .get_txout_proof(funding_txo.txid)
222            .await?
223            .block_header
224            .block_hash();
225
226        let block_height = self
227            .bitcoind_rpc
228            .get_tx_block_height(&funding_txo.txid)
229            .await?
230            .ok_or(anyhow::anyhow!("Failed to get block height"))?;
231
232        let block = self.bitcoind_rpc.get_block(&block_hash).await?;
233
234        let tx_index = block
235            .txdata
236            .iter()
237            .enumerate()
238            .find(|(_, tx)| tx.compute_txid() == funding_txo.txid)
239            .ok_or(anyhow::anyhow!("Failed to find transaction"))?
240            .0 as u32;
241
242        let output_index = funding_txo.vout;
243
244        scid_from_parts(block_height, u64::from(tx_index), u64::from(output_index))
245            .map_err(|e| anyhow::anyhow!("Failed to convert to short channel ID: {e:?}"))
246    }
247}
248
249impl Drop for GatewayLdkClient {
250    fn drop(&mut self) {
251        self.task_group.shutdown();
252
253        info!("Stopping LDK Node...");
254        if let Err(e) = self.node.stop() {
255            error!(?e, "Failed to stop LDK Node");
256        } else {
257            info!("LDK Node stopped.");
258        }
259    }
260}
261
262#[async_trait]
263impl ILnRpcClient for GatewayLdkClient {
264    async fn info(&self) -> Result<GetNodeInfoResponse, LightningRpcError> {
265        // HACK: https://github.com/lightningdevkit/ldk-node/issues/339 when running in devimint
266        // to speed up tests
267        if is_env_var_set("FM_IN_DEVIMINT") {
268            block_in_place(|| {
269                let _ = self.node.sync_wallets();
270            });
271        }
272        let node_status = self.node.status();
273
274        let chain_tip_block_height =
275            u32::try_from(self.bitcoind_rpc.get_block_count().await.map_err(|e| {
276                LightningRpcError::FailedToGetNodeInfo {
277                    failure_reason: format!("Failed to get block count from chain source: {e}"),
278                }
279            })?)
280            .expect("Failed to convert block count to u32")
281                - 1;
282        let ldk_block_height = node_status.current_best_block.height;
283        let synced_to_chain = chain_tip_block_height == ldk_block_height;
284
285        assert!(
286            chain_tip_block_height >= ldk_block_height,
287            "LDK Block Height is in the future"
288        );
289
290        Ok(GetNodeInfoResponse {
291            pub_key: self.node.node_id(),
292            alias: match self.node.node_alias() {
293                Some(alias) => alias.to_string(),
294                None => format!("LDK Fedimint Gateway Node {}", self.node.node_id()),
295            },
296            network: self.node.config().network.to_string(),
297            block_height: ldk_block_height,
298            synced_to_chain,
299        })
300    }
301
302    async fn routehints(
303        &self,
304        _num_route_hints: usize,
305    ) -> Result<GetRouteHintsResponse, LightningRpcError> {
306        // `ILnRpcClient::routehints()` is currently only ever used for LNv1 payment
307        // receives and will be removed when we switch to LNv2. The LDK gateway will
308        // never support LNv1 payment receives, only LNv2 payment receives, which
309        // require that the gateway's lightning node generates invoices rather than the
310        // fedimint client, so it is able to insert the proper route hints on its own.
311        Ok(GetRouteHintsResponse {
312            route_hints: vec![],
313        })
314    }
315
316    async fn pay(
317        &self,
318        invoice: Bolt11Invoice,
319        max_delay: u64,
320        max_fee: Amount,
321    ) -> Result<PayInvoiceResponse, LightningRpcError> {
322        let payment_id = PaymentId(*invoice.payment_hash().as_byte_array());
323
324        // Lock by the payment hash to prevent multiple simultaneous calls with the same
325        // invoice from executing. This prevents `ldk-node::Bolt11Payment::send()` from
326        // being called multiple times with the same invoice. This is important because
327        // `ldk-node::Bolt11Payment::send()` is not idempotent, but this function must
328        // be idempotent.
329        let _payment_lock_guard = self
330            .outbound_lightning_payment_lock_pool
331            .async_lock(payment_id)
332            .await;
333
334        // If a payment is not known to the node we can initiate it, and if it is known
335        // we can skip calling `ldk-node::Bolt11Payment::send()` and wait for the
336        // payment to complete. The lock guard above guarantees that this block is only
337        // executed once at a time for a given payment hash, ensuring that there is no
338        // race condition between checking if a payment is known and initiating a new
339        // payment if it isn't.
340        if self.node.payment(&payment_id).is_none() {
341            assert_eq!(
342                self.node
343                    .bolt11_payment()
344                    .send(
345                        &invoice,
346                        Some(SendingParameters {
347                            max_total_routing_fee_msat: Some(Some(max_fee.msats)),
348                            max_total_cltv_expiry_delta: Some(max_delay as u32),
349                            max_path_count: None,
350                            max_channel_saturation_power_of_half: None,
351                        }),
352                    )
353                    // TODO: Investigate whether all error types returned by `Bolt11Payment::send()`
354                    // result in idempotency.
355                    .map_err(|e| LightningRpcError::FailedPayment {
356                        failure_reason: format!("LDK payment failed to initialize: {e:?}"),
357                    })?,
358                payment_id
359            );
360        }
361
362        // TODO: Find a way to avoid looping/polling to know when a payment is
363        // completed. `ldk-node` provides `PaymentSuccessful` and `PaymentFailed`
364        // events, but interacting with the node event queue here isn't
365        // straightforward.
366        loop {
367            if let Some(payment_details) = self.node.payment(&payment_id) {
368                match payment_details.status {
369                    PaymentStatus::Pending => {}
370                    PaymentStatus::Succeeded => {
371                        if let PaymentKind::Bolt11 {
372                            preimage: Some(preimage),
373                            ..
374                        } = payment_details.kind
375                        {
376                            return Ok(PayInvoiceResponse {
377                                preimage: Preimage(preimage.0),
378                            });
379                        }
380                    }
381                    PaymentStatus::Failed => {
382                        return Err(LightningRpcError::FailedPayment {
383                            failure_reason: "LDK payment failed".to_string(),
384                        });
385                    }
386                }
387            }
388            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
389        }
390    }
391
392    async fn route_htlcs<'a>(
393        mut self: Box<Self>,
394        _task_group: &TaskGroup,
395    ) -> Result<(RouteHtlcStream<'a>, Arc<dyn ILnRpcClient>), LightningRpcError> {
396        let route_htlc_stream = match self.htlc_stream_receiver_or.take() {
397            Some(stream) => Ok(Box::pin(ReceiverStream::new(stream))),
398            None => Err(LightningRpcError::FailedToRouteHtlcs {
399                failure_reason:
400                    "Stream does not exist. Likely was already taken by calling `route_htlcs()`."
401                        .to_string(),
402            }),
403        }?;
404
405        Ok((route_htlc_stream, Arc::new(*self)))
406    }
407
408    async fn complete_htlc(&self, htlc: InterceptPaymentResponse) -> Result<(), LightningRpcError> {
409        let InterceptPaymentResponse {
410            action,
411            payment_hash,
412            incoming_chan_id: _,
413            htlc_id: _,
414        } = htlc;
415
416        let ph = PaymentHash(*payment_hash.clone().as_byte_array());
417
418        // TODO: Get the actual amount from the LDK node. Probably makes the
419        // most sense to pipe it through the `InterceptHtlcResponse` struct.
420        // This value is only used by `ldk-node` to ensure that the amount
421        // claimed isn't less than the amount expected, but we've already
422        // verified that the amount is correct when we intercepted the payment.
423        let claimable_amount_msat = 999_999_999_999_999;
424
425        let ph_hex_str = hex::encode(payment_hash);
426
427        if let PaymentAction::Settle(preimage) = action {
428            self.node
429                .bolt11_payment()
430                .claim_for_hash(ph, claimable_amount_msat, PaymentPreimage(preimage.0))
431                .map_err(|_| LightningRpcError::FailedToCompleteHtlc {
432                    failure_reason: format!("Failed to claim LDK payment with hash {ph_hex_str}"),
433                })?;
434        } else {
435            error!("Unwinding payment with hash {ph_hex_str} because the action was not `Settle`");
436            self.node.bolt11_payment().fail_for_hash(ph).map_err(|_| {
437                LightningRpcError::FailedToCompleteHtlc {
438                    failure_reason: format!("Failed to unwind LDK payment with hash {ph_hex_str}"),
439                }
440            })?;
441        }
442
443        return Ok(());
444    }
445
446    async fn create_invoice(
447        &self,
448        create_invoice_request: CreateInvoiceRequest,
449    ) -> Result<CreateInvoiceResponse, LightningRpcError> {
450        let payment_hash_or = if let Some(payment_hash) = create_invoice_request.payment_hash {
451            let ph = PaymentHash(*payment_hash.as_byte_array());
452            Some(ph)
453        } else {
454            None
455        };
456
457        // Currently `ldk-node` only supports direct descriptions.
458        // See https://github.com/lightningdevkit/ldk-node/issues/325.
459        // TODO: Once the above issue is resolved, we should support
460        // description hashes as well.
461        let description_str = match create_invoice_request.description {
462            Some(InvoiceDescription::Direct(desc)) => desc,
463            _ => String::new(),
464        };
465
466        let invoice = match payment_hash_or {
467            Some(payment_hash) => self.node.bolt11_payment().receive_for_hash(
468                create_invoice_request.amount_msat,
469                description_str.as_str(),
470                create_invoice_request.expiry_secs,
471                payment_hash,
472            ),
473            None => self.node.bolt11_payment().receive(
474                create_invoice_request.amount_msat,
475                description_str.as_str(),
476                create_invoice_request.expiry_secs,
477            ),
478        }
479        .map_err(|e| LightningRpcError::FailedToGetInvoice {
480            failure_reason: e.to_string(),
481        })?;
482
483        Ok(CreateInvoiceResponse {
484            invoice: invoice.to_string(),
485        })
486    }
487
488    async fn get_ln_onchain_address(
489        &self,
490    ) -> Result<GetLnOnchainAddressResponse, LightningRpcError> {
491        self.node
492            .onchain_payment()
493            .new_address()
494            .map(|address| GetLnOnchainAddressResponse {
495                address: address.to_string(),
496            })
497            .map_err(|e| LightningRpcError::FailedToGetLnOnchainAddress {
498                failure_reason: e.to_string(),
499            })
500    }
501
502    async fn send_onchain(
503        &self,
504        SendOnchainRequest {
505            address,
506            amount,
507            // TODO: Respect this fee rate once `ldk-node` supports setting a custom fee rate.
508            // This work is planned to be in `ldk-node` v0.4 and is tracked here:
509            // https://github.com/lightningdevkit/ldk-node/issues/176
510            fee_rate_sats_per_vbyte: _,
511        }: SendOnchainRequest,
512    ) -> Result<SendOnchainResponse, LightningRpcError> {
513        let onchain = self.node.onchain_payment();
514
515        let txid = match amount {
516            BitcoinAmountOrAll::All => onchain.send_all_to_address(&address.assume_checked()),
517            BitcoinAmountOrAll::Amount(amount_sats) => {
518                onchain.send_to_address(&address.assume_checked(), amount_sats.to_sat())
519            }
520        }
521        .map_err(|e| LightningRpcError::FailedToWithdrawOnchain {
522            failure_reason: e.to_string(),
523        })?;
524
525        Ok(SendOnchainResponse {
526            txid: txid.to_string(),
527        })
528    }
529
530    async fn open_channel(
531        &self,
532        OpenChannelRequest {
533            pubkey,
534            host,
535            channel_size_sats,
536            push_amount_sats,
537        }: OpenChannelRequest,
538    ) -> Result<OpenChannelResponse, LightningRpcError> {
539        let push_amount_msats_or = if push_amount_sats == 0 {
540            None
541        } else {
542            Some(push_amount_sats * 1000)
543        };
544
545        let user_channel_id = self
546            .node
547            .open_announced_channel(
548                pubkey,
549                SocketAddress::from_str(&host).map_err(|e| {
550                    LightningRpcError::FailedToConnectToPeer {
551                        failure_reason: e.to_string(),
552                    }
553                })?,
554                channel_size_sats,
555                push_amount_msats_or,
556                None,
557            )
558            .map_err(|e| LightningRpcError::FailedToOpenChannel {
559                failure_reason: e.to_string(),
560            })?;
561
562        // The channel isn't always visible immediately, so we need to poll for it.
563        for _ in 0..10 {
564            let funding_txid_or = self
565                .node
566                .list_channels()
567                .iter()
568                .find(|channel| channel.user_channel_id == user_channel_id)
569                .and_then(|channel| channel.funding_txo)
570                .map(|funding_txo| funding_txo.txid);
571
572            if let Some(funding_txid) = funding_txid_or {
573                return Ok(OpenChannelResponse {
574                    funding_txid: funding_txid.to_string(),
575                });
576            }
577
578            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
579        }
580
581        Err(LightningRpcError::FailedToOpenChannel {
582            failure_reason: "Channel could not be opened".to_string(),
583        })
584    }
585
586    async fn close_channels_with_peer(
587        &self,
588        CloseChannelsWithPeerRequest { pubkey }: CloseChannelsWithPeerRequest,
589    ) -> Result<CloseChannelsWithPeerResponse, LightningRpcError> {
590        let mut num_channels_closed = 0;
591
592        for channel_with_peer in self
593            .node
594            .list_channels()
595            .iter()
596            .filter(|channel| channel.counterparty_node_id == pubkey)
597        {
598            if self
599                .node
600                .close_channel(&channel_with_peer.user_channel_id, pubkey)
601                .is_ok()
602            {
603                num_channels_closed += 1;
604            }
605        }
606
607        Ok(CloseChannelsWithPeerResponse {
608            num_channels_closed,
609        })
610    }
611
612    async fn list_active_channels(&self) -> Result<ListActiveChannelsResponse, LightningRpcError> {
613        let mut channels = Vec::new();
614
615        for channel_details in self
616            .node
617            .list_channels()
618            .iter()
619            .filter(|channel| channel.is_usable)
620        {
621            channels.push(ChannelInfo {
622                remote_pubkey: channel_details.counterparty_node_id,
623                channel_size_sats: channel_details.channel_value_sats,
624                outbound_liquidity_sats: channel_details.outbound_capacity_msat / 1000,
625                inbound_liquidity_sats: channel_details.inbound_capacity_msat / 1000,
626                short_channel_id: match channel_details.funding_txo {
627                    Some(funding_txo) => self.outpoint_to_scid(funding_txo).await.unwrap_or(0),
628                    None => 0,
629                },
630            });
631        }
632
633        Ok(ListActiveChannelsResponse { channels })
634    }
635
636    async fn get_balances(&self) -> Result<GetBalancesResponse, LightningRpcError> {
637        let balances = self.node.list_balances();
638        let channel_lists = self
639            .node
640            .list_channels()
641            .into_iter()
642            .filter(|chan| chan.is_usable)
643            .collect::<Vec<_>>();
644        // map and get the total inbound_capacity_msat in the channels
645        let total_inbound_liquidity_balance_msat: u64 = channel_lists
646            .iter()
647            .map(|channel| channel.inbound_capacity_msat)
648            .sum();
649
650        Ok(GetBalancesResponse {
651            onchain_balance_sats: balances.total_onchain_balance_sats,
652            lightning_balance_msats: balances.total_lightning_balance_sats * 1000,
653            inbound_lightning_liquidity_msats: total_inbound_liquidity_balance_msat,
654        })
655    }
656}