1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bitcoin::hashes::Hash;
8use bitcoin::{Network, OutPoint};
9use fedimint_bip39::Mnemonic;
10use fedimint_core::envs::is_env_var_set;
11use fedimint_core::task::{block_in_place, TaskGroup, TaskHandle};
12use fedimint_core::{Amount, BitcoinAmountOrAll};
13use fedimint_ln_common::contracts::Preimage;
14use ldk_node::config::EsploraSyncConfig;
15use ldk_node::lightning::ln::msgs::SocketAddress;
16use ldk_node::lightning::ln::PaymentHash;
17use ldk_node::lightning::routing::gossip::NodeAlias;
18use ldk_node::payment::{PaymentKind, PaymentStatus, SendingParameters};
19use lightning::ln::channelmanager::PaymentId;
20use lightning::ln::PaymentPreimage;
21use lightning::util::scid_utils::scid_from_parts;
22use lightning_invoice::Bolt11Invoice;
23use tokio::sync::mpsc::Sender;
24use tokio_stream::wrappers::ReceiverStream;
25use tracing::{error, info};
26
27use super::{ChannelInfo, ILnRpcClient, LightningRpcError, RouteHtlcStream};
28use crate::lightning::{
29 CloseChannelsWithPeerResponse, CreateInvoiceRequest, CreateInvoiceResponse,
30 GetBalancesResponse, GetLnOnchainAddressResponse, GetNodeInfoResponse, GetRouteHintsResponse,
31 InterceptPaymentRequest, InterceptPaymentResponse, InvoiceDescription, OpenChannelResponse,
32 PayInvoiceResponse, PaymentAction, SendOnchainResponse,
33};
34use crate::rpc::{CloseChannelsWithPeerPayload, OpenChannelPayload, SendOnchainPayload};
35
36pub struct GatewayLdkClient {
37 node: Arc<ldk_node::Node>,
39
40 esplora_client: esplora_client::AsyncClient,
42
43 task_group: TaskGroup,
44
45 htlc_stream_receiver_or: Option<tokio::sync::mpsc::Receiver<InterceptPaymentRequest>>,
48
49 outbound_lightning_payment_lock_pool: lockable::LockPool<PaymentId>,
53}
54
55impl std::fmt::Debug for GatewayLdkClient {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("GatewayLdkClient").finish_non_exhaustive()
58 }
59}
60
61impl GatewayLdkClient {
62 pub fn new(
67 data_dir: &Path,
68 esplora_server_url: &str,
69 network: Network,
70 lightning_port: u16,
71 mnemonic: Mnemonic,
72 ) -> anyhow::Result<Self> {
73 let node_alias = if network == Network::Bitcoin {
78 None
79 } else {
80 let alias = format!("{network} LDK Gateway");
81 let mut bytes = [0u8; 32];
82 bytes[..alias.as_bytes().len()].copy_from_slice(alias.as_bytes());
83 Some(NodeAlias(bytes))
84 };
85
86 let mut node_builder = ldk_node::Builder::from_config(ldk_node::config::Config {
87 network,
88 listening_addresses: Some(vec![SocketAddress::TcpIpV4 {
89 addr: [0, 0, 0, 0],
90 port: lightning_port,
91 }]),
92 node_alias,
93 ..Default::default()
94 });
95 node_builder
96 .set_entropy_bip39_mnemonic(mnemonic, None)
97 .set_chain_source_esplora(
98 esplora_server_url.to_string(),
99 Some(EsploraSyncConfig {
100 onchain_wallet_sync_interval_secs: 10,
103 lightning_wallet_sync_interval_secs: 10,
104 ..Default::default()
105 }),
106 );
107 let Some(data_dir_str) = data_dir.to_str() else {
108 return Err(anyhow::anyhow!("Invalid data dir path"));
109 };
110 node_builder.set_storage_dir_path(data_dir_str.to_string());
111
112 let node = Arc::new(node_builder.build()?);
113 node.start().map_err(|e| {
116 error!(?e, "Failed to start LDK Node");
117 LightningRpcError::FailedToConnect
118 })?;
119
120 let (htlc_stream_sender, htlc_stream_receiver) = tokio::sync::mpsc::channel(1024);
121 let task_group = TaskGroup::new();
122
123 let node_clone = node.clone();
124 task_group.spawn("ldk lightning node event handler", |handle| async move {
125 loop {
126 Self::handle_next_event(&node_clone, &htlc_stream_sender, &handle).await;
127 }
128 });
129
130 Ok(GatewayLdkClient {
131 node,
132 esplora_client: esplora_client::Builder::new(esplora_server_url).build_async()?,
133 task_group,
134 htlc_stream_receiver_or: Some(htlc_stream_receiver),
135 outbound_lightning_payment_lock_pool: lockable::LockPool::new(),
136 })
137 }
138
139 async fn handle_next_event(
140 node: &ldk_node::Node,
141 htlc_stream_sender: &Sender<InterceptPaymentRequest>,
142 handle: &TaskHandle,
143 ) {
144 let event = tokio::select! {
148 event = node.next_event_async() => {
149 event
150 }
151 () = handle.make_shutdown_rx() => {
152 return;
153 }
154 };
155
156 if let ldk_node::Event::PaymentClaimable {
157 payment_id: _,
158 payment_hash,
159 claimable_amount_msat,
160 claim_deadline,
161 } = event
162 {
163 if let Err(e) = htlc_stream_sender
164 .send(InterceptPaymentRequest {
165 payment_hash: Hash::from_slice(&payment_hash.0).expect("Failed to create Hash"),
166 amount_msat: claimable_amount_msat,
167 expiry: claim_deadline.unwrap_or_default(),
168 short_channel_id: None,
169 incoming_chan_id: 0,
170 htlc_id: 0,
171 })
172 .await
173 {
174 error!(?e, "Failed send InterceptHtlcRequest to stream");
175 }
176 }
177
178 node.event_handled();
181 }
182
183 async fn outpoint_to_scid(&self, funding_txo: OutPoint) -> anyhow::Result<u64> {
186 let block_height = self
187 .esplora_client
188 .get_merkle_proof(&funding_txo.txid)
189 .await?
190 .ok_or(anyhow::anyhow!("Failed to get merkle proof"))?
191 .block_height;
192
193 let block_hash = self.esplora_client.get_block_hash(block_height).await?;
194
195 let block = self
196 .esplora_client
197 .get_block_by_hash(&block_hash)
198 .await?
199 .ok_or(anyhow::anyhow!("Failed to get block"))?;
200
201 let tx_index = block
202 .txdata
203 .iter()
204 .enumerate()
205 .find(|(_, tx)| tx.compute_txid() == funding_txo.txid)
206 .ok_or(anyhow::anyhow!("Failed to find transaction"))?
207 .0 as u32;
208
209 let output_index = funding_txo.vout;
210
211 scid_from_parts(
212 u64::from(block_height),
213 u64::from(tx_index),
214 u64::from(output_index),
215 )
216 .map_err(|e| anyhow::anyhow!("Failed to convert to short channel ID: {e:?}"))
217 }
218}
219
220impl Drop for GatewayLdkClient {
221 fn drop(&mut self) {
222 self.task_group.shutdown();
223
224 info!("Stopping LDK Node...");
225 if let Err(e) = self.node.stop() {
226 error!(?e, "Failed to stop LDK Node");
227 } else {
228 info!("LDK Node stopped.");
229 }
230 }
231}
232
233#[async_trait]
234impl ILnRpcClient for GatewayLdkClient {
235 async fn info(&self) -> Result<GetNodeInfoResponse, LightningRpcError> {
236 if is_env_var_set("FM_IN_DEVIMINT") {
239 block_in_place(|| {
240 let _ = self.node.sync_wallets();
241 });
242 }
243 let node_status = self.node.status();
244
245 let Some(esplora_chain_tip_block_summary) = self
246 .esplora_client
247 .get_blocks(None)
248 .await
249 .map_err(|e| LightningRpcError::FailedToGetNodeInfo {
250 failure_reason: format!("Failed get chain tip block summary: {e:?}"),
251 })?
252 .into_iter()
253 .next()
254 else {
255 return Err(LightningRpcError::FailedToGetNodeInfo {
256 failure_reason:
257 "Failed to get chain tip block summary (empty block list was returned)"
258 .to_string(),
259 });
260 };
261
262 let esplora_chain_tip_block_height = esplora_chain_tip_block_summary.time.height;
263 let ldk_block_height: u32 = node_status.current_best_block.height;
264 let synced_to_chain = esplora_chain_tip_block_height == ldk_block_height;
265
266 assert!(
267 esplora_chain_tip_block_height >= ldk_block_height,
268 "LDK Block Height is in the future"
269 );
270
271 Ok(GetNodeInfoResponse {
272 pub_key: self.node.node_id(),
273 alias: match self.node.node_alias() {
274 Some(alias) => alias.to_string(),
275 None => format!("LDK Fedimint Gateway Node {}", self.node.node_id()),
276 },
277 network: self.node.config().network.to_string(),
278 block_height: ldk_block_height,
279 synced_to_chain,
280 })
281 }
282
283 async fn routehints(
284 &self,
285 _num_route_hints: usize,
286 ) -> Result<GetRouteHintsResponse, LightningRpcError> {
287 Ok(GetRouteHintsResponse {
290 route_hints: vec![],
291 })
292 }
293
294 async fn pay(
295 &self,
296 invoice: Bolt11Invoice,
297 max_delay: u64,
298 max_fee: Amount,
299 ) -> Result<PayInvoiceResponse, LightningRpcError> {
300 let payment_id = PaymentId(*invoice.payment_hash().as_byte_array());
301
302 let _payment_lock_guard = self
308 .outbound_lightning_payment_lock_pool
309 .async_lock(payment_id)
310 .await;
311
312 if self.node.payment(&payment_id).is_none() {
319 assert_eq!(
320 self.node
321 .bolt11_payment()
322 .send(
323 &invoice,
324 Some(SendingParameters {
325 max_total_routing_fee_msat: Some(Some(max_fee.msats)),
326 max_total_cltv_expiry_delta: Some(max_delay as u32),
327 max_path_count: None,
328 max_channel_saturation_power_of_half: None,
329 }),
330 )
331 .map_err(|e| LightningRpcError::FailedPayment {
334 failure_reason: format!("LDK payment failed to initialize: {e:?}"),
335 })?,
336 payment_id
337 );
338 }
339
340 loop {
345 if let Some(payment_details) = self.node.payment(&payment_id) {
346 match payment_details.status {
347 PaymentStatus::Pending => {}
348 PaymentStatus::Succeeded => {
349 if let PaymentKind::Bolt11 {
350 preimage: Some(preimage),
351 ..
352 } = payment_details.kind
353 {
354 return Ok(PayInvoiceResponse {
355 preimage: Preimage(preimage.0),
356 });
357 }
358 }
359 PaymentStatus::Failed => {
360 return Err(LightningRpcError::FailedPayment {
361 failure_reason: "LDK payment failed".to_string(),
362 });
363 }
364 }
365 }
366 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
367 }
368 }
369
370 async fn route_htlcs<'a>(
371 mut self: Box<Self>,
372 _task_group: &TaskGroup,
373 ) -> Result<(RouteHtlcStream<'a>, Arc<dyn ILnRpcClient>), LightningRpcError> {
374 let route_htlc_stream = match self.htlc_stream_receiver_or.take() {
375 Some(stream) => Ok(Box::pin(ReceiverStream::new(stream))),
376 None => Err(LightningRpcError::FailedToRouteHtlcs {
377 failure_reason:
378 "Stream does not exist. Likely was already taken by calling `route_htlcs()`."
379 .to_string(),
380 }),
381 }?;
382
383 Ok((route_htlc_stream, Arc::new(*self)))
384 }
385
386 async fn complete_htlc(&self, htlc: InterceptPaymentResponse) -> Result<(), LightningRpcError> {
387 let InterceptPaymentResponse {
388 action,
389 payment_hash,
390 incoming_chan_id: _,
391 htlc_id: _,
392 } = htlc;
393
394 let ph = PaymentHash(*payment_hash.clone().as_byte_array());
395
396 let claimable_amount_msat = 999_999_999_999_999;
402
403 let ph_hex_str = hex::encode(payment_hash);
404
405 if let PaymentAction::Settle(preimage) = action {
406 self.node
407 .bolt11_payment()
408 .claim_for_hash(ph, claimable_amount_msat, PaymentPreimage(preimage.0))
409 .map_err(|_| LightningRpcError::FailedToCompleteHtlc {
410 failure_reason: format!("Failed to claim LDK payment with hash {ph_hex_str}"),
411 })?;
412 } else {
413 error!("Unwinding payment with hash {ph_hex_str} because the action was not `Settle`");
414 self.node.bolt11_payment().fail_for_hash(ph).map_err(|_| {
415 LightningRpcError::FailedToCompleteHtlc {
416 failure_reason: format!("Failed to unwind LDK payment with hash {ph_hex_str}"),
417 }
418 })?;
419 }
420
421 return Ok(());
422 }
423
424 async fn create_invoice(
425 &self,
426 create_invoice_request: CreateInvoiceRequest,
427 ) -> Result<CreateInvoiceResponse, LightningRpcError> {
428 let payment_hash_or = if let Some(payment_hash) = create_invoice_request.payment_hash {
429 let ph = PaymentHash(*payment_hash.as_byte_array());
430 Some(ph)
431 } else {
432 None
433 };
434
435 let description_str = match create_invoice_request.description {
440 Some(InvoiceDescription::Direct(desc)) => desc,
441 _ => String::new(),
442 };
443
444 let invoice = match payment_hash_or {
445 Some(payment_hash) => self.node.bolt11_payment().receive_for_hash(
446 create_invoice_request.amount_msat,
447 description_str.as_str(),
448 create_invoice_request.expiry_secs,
449 payment_hash,
450 ),
451 None => self.node.bolt11_payment().receive(
452 create_invoice_request.amount_msat,
453 description_str.as_str(),
454 create_invoice_request.expiry_secs,
455 ),
456 }
457 .map_err(|e| LightningRpcError::FailedToGetInvoice {
458 failure_reason: e.to_string(),
459 })?;
460
461 Ok(CreateInvoiceResponse {
462 invoice: invoice.to_string(),
463 })
464 }
465
466 async fn get_ln_onchain_address(
467 &self,
468 ) -> Result<GetLnOnchainAddressResponse, LightningRpcError> {
469 self.node
470 .onchain_payment()
471 .new_address()
472 .map(|address| GetLnOnchainAddressResponse {
473 address: address.to_string(),
474 })
475 .map_err(|e| LightningRpcError::FailedToGetLnOnchainAddress {
476 failure_reason: e.to_string(),
477 })
478 }
479
480 async fn send_onchain(
481 &self,
482 SendOnchainPayload {
483 address,
484 amount,
485 fee_rate_sats_per_vbyte: _,
489 }: SendOnchainPayload,
490 ) -> Result<SendOnchainResponse, LightningRpcError> {
491 let onchain = self.node.onchain_payment();
492
493 let txid = match amount {
494 BitcoinAmountOrAll::All => onchain.send_all_to_address(&address.assume_checked()),
495 BitcoinAmountOrAll::Amount(amount_sats) => {
496 onchain.send_to_address(&address.assume_checked(), amount_sats.to_sat())
497 }
498 }
499 .map_err(|e| LightningRpcError::FailedToWithdrawOnchain {
500 failure_reason: e.to_string(),
501 })?;
502
503 Ok(SendOnchainResponse {
504 txid: txid.to_string(),
505 })
506 }
507
508 async fn open_channel(
509 &self,
510 OpenChannelPayload {
511 pubkey,
512 host,
513 channel_size_sats,
514 push_amount_sats,
515 }: OpenChannelPayload,
516 ) -> Result<OpenChannelResponse, LightningRpcError> {
517 let push_amount_msats_or = if push_amount_sats == 0 {
518 None
519 } else {
520 Some(push_amount_sats * 1000)
521 };
522
523 let user_channel_id = self
524 .node
525 .open_announced_channel(
526 pubkey,
527 SocketAddress::from_str(&host).map_err(|e| {
528 LightningRpcError::FailedToConnectToPeer {
529 failure_reason: e.to_string(),
530 }
531 })?,
532 channel_size_sats,
533 push_amount_msats_or,
534 None,
535 )
536 .map_err(|e| LightningRpcError::FailedToOpenChannel {
537 failure_reason: e.to_string(),
538 })?;
539
540 for _ in 0..10 {
542 let funding_txid_or = self
543 .node
544 .list_channels()
545 .iter()
546 .find(|channel| channel.user_channel_id == user_channel_id)
547 .and_then(|channel| channel.funding_txo)
548 .map(|funding_txo| funding_txo.txid);
549
550 if let Some(funding_txid) = funding_txid_or {
551 return Ok(OpenChannelResponse {
552 funding_txid: funding_txid.to_string(),
553 });
554 }
555
556 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
557 }
558
559 Err(LightningRpcError::FailedToOpenChannel {
560 failure_reason: "Channel could not be opened".to_string(),
561 })
562 }
563
564 async fn close_channels_with_peer(
565 &self,
566 CloseChannelsWithPeerPayload { pubkey }: CloseChannelsWithPeerPayload,
567 ) -> Result<CloseChannelsWithPeerResponse, LightningRpcError> {
568 let mut num_channels_closed = 0;
569
570 for channel_with_peer in self
571 .node
572 .list_channels()
573 .iter()
574 .filter(|channel| channel.counterparty_node_id == pubkey)
575 {
576 if self
577 .node
578 .close_channel(&channel_with_peer.user_channel_id, pubkey)
579 .is_ok()
580 {
581 num_channels_closed += 1;
582 }
583 }
584
585 Ok(CloseChannelsWithPeerResponse {
586 num_channels_closed,
587 })
588 }
589
590 async fn list_active_channels(&self) -> Result<Vec<ChannelInfo>, LightningRpcError> {
591 let mut channels = Vec::new();
592
593 for channel_details in self
594 .node
595 .list_channels()
596 .iter()
597 .filter(|channel| channel.is_usable)
598 {
599 channels.push(ChannelInfo {
600 remote_pubkey: channel_details.counterparty_node_id,
601 channel_size_sats: channel_details.channel_value_sats,
602 outbound_liquidity_sats: channel_details.outbound_capacity_msat / 1000,
603 inbound_liquidity_sats: channel_details.inbound_capacity_msat / 1000,
604 short_channel_id: match channel_details.funding_txo {
605 Some(funding_txo) => self.outpoint_to_scid(funding_txo).await.unwrap_or(0),
606 None => 0,
607 },
608 });
609 }
610
611 Ok(channels)
612 }
613
614 async fn get_balances(&self) -> Result<GetBalancesResponse, LightningRpcError> {
615 let balances = self.node.list_balances();
616 let channel_lists = self
617 .node
618 .list_channels()
619 .into_iter()
620 .filter(|chan| chan.is_usable)
621 .collect::<Vec<_>>();
622 let total_inbound_liquidity_balance_msat: u64 = channel_lists
624 .iter()
625 .map(|channel| channel.inbound_capacity_msat)
626 .sum();
627
628 Ok(GetBalancesResponse {
629 onchain_balance_sats: balances.total_onchain_balance_sats,
630 lightning_balance_msats: balances.total_lightning_balance_sats * 1000,
631 inbound_lightning_liquidity_msats: total_inbound_liquidity_balance_msat,
632 })
633 }
634}