1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bitcoin::hashes::Hash;
8use bitcoin::{Network, OutPoint};
9use fedimint_bip39::Mnemonic;
10use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
11use fedimint_core::envs::{is_env_var_set, BitcoinRpcConfig};
12use fedimint_core::task::{block_in_place, TaskGroup, TaskHandle};
13use fedimint_core::util::SafeUrl;
14use fedimint_core::{Amount, BitcoinAmountOrAll};
15use fedimint_ln_common::contracts::Preimage;
16use ldk_node::lightning::ln::msgs::SocketAddress;
17use ldk_node::lightning::ln::PaymentHash;
18use ldk_node::lightning::routing::gossip::NodeAlias;
19use ldk_node::payment::{PaymentKind, PaymentStatus, SendingParameters};
20use lightning::ln::channelmanager::PaymentId;
21use lightning::ln::PaymentPreimage;
22use lightning::util::scid_utils::scid_from_parts;
23use lightning_invoice::Bolt11Invoice;
24use tokio::sync::mpsc::Sender;
25use tokio_stream::wrappers::ReceiverStream;
26use tracing::{error, info};
27
28use super::{
29 ChannelInfo, ILnRpcClient, LightningRpcError, ListActiveChannelsResponse, RouteHtlcStream,
30};
31use crate::{
32 CloseChannelsWithPeerRequest, CloseChannelsWithPeerResponse, CreateInvoiceRequest,
33 CreateInvoiceResponse, GetBalancesResponse, GetLnOnchainAddressResponse, GetNodeInfoResponse,
34 GetRouteHintsResponse, InterceptPaymentRequest, InterceptPaymentResponse, InvoiceDescription,
35 OpenChannelRequest, OpenChannelResponse, PayInvoiceResponse, PaymentAction, SendOnchainRequest,
36 SendOnchainResponse,
37};
38
39pub enum GatewayLdkChainSourceConfig {
40 Bitcoind { server_url: SafeUrl },
41 Esplora { server_url: SafeUrl },
42}
43
44impl GatewayLdkChainSourceConfig {
45 fn bitcoin_rpc_config(&self) -> BitcoinRpcConfig {
46 match self {
47 Self::Bitcoind { server_url } => BitcoinRpcConfig {
48 kind: "bitcoind".to_string(),
49 url: server_url.clone(),
50 },
51 Self::Esplora { server_url } => BitcoinRpcConfig {
52 kind: "esplora".to_string(),
53 url: server_url.clone(),
54 },
55 }
56 }
57}
58
59pub struct GatewayLdkClient {
60 node: Arc<ldk_node::Node>,
62
63 bitcoind_rpc: DynBitcoindRpc,
65
66 task_group: TaskGroup,
67
68 htlc_stream_receiver_or: Option<tokio::sync::mpsc::Receiver<InterceptPaymentRequest>>,
71
72 outbound_lightning_payment_lock_pool: lockable::LockPool<PaymentId>,
76}
77
78impl std::fmt::Debug for GatewayLdkClient {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("GatewayLdkClient").finish_non_exhaustive()
81 }
82}
83
84impl GatewayLdkClient {
85 pub fn new(
90 data_dir: &Path,
91 chain_source_config: GatewayLdkChainSourceConfig,
92 network: Network,
93 lightning_port: u16,
94 mnemonic: Mnemonic,
95 runtime: Arc<tokio::runtime::Runtime>,
96 ) -> anyhow::Result<Self> {
97 let node_alias = if network == Network::Bitcoin {
102 None
103 } else {
104 let alias = format!("{network} LDK Gateway");
105 let mut bytes = [0u8; 32];
106 bytes[..alias.as_bytes().len()].copy_from_slice(alias.as_bytes());
107 Some(NodeAlias(bytes))
108 };
109
110 let mut node_builder = ldk_node::Builder::from_config(ldk_node::config::Config {
111 network,
112 listening_addresses: Some(vec![SocketAddress::TcpIpV4 {
113 addr: [0, 0, 0, 0],
114 port: lightning_port,
115 }]),
116 node_alias,
117 ..Default::default()
118 });
119
120 node_builder.set_entropy_bip39_mnemonic(mnemonic, None);
121
122 let bitcoind_rpc = create_bitcoind(&chain_source_config.bitcoin_rpc_config())?;
123
124 match chain_source_config {
125 GatewayLdkChainSourceConfig::Bitcoind { server_url } => {
126 node_builder.set_chain_source_bitcoind_rpc(
127 server_url
128 .host_str()
129 .expect("Could not retrieve host from bitcoind RPC url")
130 .to_string(),
131 server_url
132 .port()
133 .expect("Could not retrieve port from bitcoind RPC url"),
134 server_url.username().to_string(),
135 server_url.password().unwrap_or_default().to_string(),
136 );
137 }
138 GatewayLdkChainSourceConfig::Esplora { server_url } => {
139 node_builder.set_chain_source_esplora(server_url.to_string(), None);
140 }
141 };
142 let Some(data_dir_str) = data_dir.to_str() else {
143 return Err(anyhow::anyhow!("Invalid data dir path"));
144 };
145 node_builder.set_storage_dir_path(data_dir_str.to_string());
146
147 let node = Arc::new(node_builder.build()?);
148 node.start_with_runtime(runtime).map_err(|e| {
149 error!(?e, "Failed to start LDK Node");
150 LightningRpcError::FailedToConnect
151 })?;
152
153 let (htlc_stream_sender, htlc_stream_receiver) = tokio::sync::mpsc::channel(1024);
154 let task_group = TaskGroup::new();
155
156 let node_clone = node.clone();
157 task_group.spawn("ldk lightning node event handler", |handle| async move {
158 loop {
159 Self::handle_next_event(&node_clone, &htlc_stream_sender, &handle).await;
160 }
161 });
162
163 Ok(GatewayLdkClient {
164 node,
165 bitcoind_rpc,
166 task_group,
167 htlc_stream_receiver_or: Some(htlc_stream_receiver),
168 outbound_lightning_payment_lock_pool: lockable::LockPool::new(),
169 })
170 }
171
172 async fn handle_next_event(
173 node: &ldk_node::Node,
174 htlc_stream_sender: &Sender<InterceptPaymentRequest>,
175 handle: &TaskHandle,
176 ) {
177 let event = tokio::select! {
181 event = node.next_event_async() => {
182 event
183 }
184 () = handle.make_shutdown_rx() => {
185 return;
186 }
187 };
188
189 if let ldk_node::Event::PaymentClaimable {
190 payment_id: _,
191 payment_hash,
192 claimable_amount_msat,
193 claim_deadline,
194 } = event
195 {
196 if let Err(e) = htlc_stream_sender
197 .send(InterceptPaymentRequest {
198 payment_hash: Hash::from_slice(&payment_hash.0).expect("Failed to create Hash"),
199 amount_msat: claimable_amount_msat,
200 expiry: claim_deadline.unwrap_or_default(),
201 short_channel_id: None,
202 incoming_chan_id: 0,
203 htlc_id: 0,
204 })
205 .await
206 {
207 error!(?e, "Failed send InterceptHtlcRequest to stream");
208 }
209 }
210
211 node.event_handled();
214 }
215
216 async fn outpoint_to_scid(&self, funding_txo: OutPoint) -> anyhow::Result<u64> {
219 let block_hash = self
220 .bitcoind_rpc
221 .get_txout_proof(funding_txo.txid)
222 .await?
223 .block_header
224 .block_hash();
225
226 let block_height = self
227 .bitcoind_rpc
228 .get_tx_block_height(&funding_txo.txid)
229 .await?
230 .ok_or(anyhow::anyhow!("Failed to get block height"))?;
231
232 let block = self.bitcoind_rpc.get_block(&block_hash).await?;
233
234 let tx_index = block
235 .txdata
236 .iter()
237 .enumerate()
238 .find(|(_, tx)| tx.compute_txid() == funding_txo.txid)
239 .ok_or(anyhow::anyhow!("Failed to find transaction"))?
240 .0 as u32;
241
242 let output_index = funding_txo.vout;
243
244 scid_from_parts(block_height, u64::from(tx_index), u64::from(output_index))
245 .map_err(|e| anyhow::anyhow!("Failed to convert to short channel ID: {e:?}"))
246 }
247}
248
249impl Drop for GatewayLdkClient {
250 fn drop(&mut self) {
251 self.task_group.shutdown();
252
253 info!("Stopping LDK Node...");
254 if let Err(e) = self.node.stop() {
255 error!(?e, "Failed to stop LDK Node");
256 } else {
257 info!("LDK Node stopped.");
258 }
259 }
260}
261
262#[async_trait]
263impl ILnRpcClient for GatewayLdkClient {
264 async fn info(&self) -> Result<GetNodeInfoResponse, LightningRpcError> {
265 if is_env_var_set("FM_IN_DEVIMINT") {
268 block_in_place(|| {
269 let _ = self.node.sync_wallets();
270 });
271 }
272 let node_status = self.node.status();
273
274 let chain_tip_block_height =
275 u32::try_from(self.bitcoind_rpc.get_block_count().await.map_err(|e| {
276 LightningRpcError::FailedToGetNodeInfo {
277 failure_reason: format!("Failed to get block count from chain source: {e}"),
278 }
279 })?)
280 .expect("Failed to convert block count to u32")
281 - 1;
282 let ldk_block_height = node_status.current_best_block.height;
283 let synced_to_chain = chain_tip_block_height == ldk_block_height;
284
285 assert!(
286 chain_tip_block_height >= ldk_block_height,
287 "LDK Block Height is in the future"
288 );
289
290 Ok(GetNodeInfoResponse {
291 pub_key: self.node.node_id(),
292 alias: match self.node.node_alias() {
293 Some(alias) => alias.to_string(),
294 None => format!("LDK Fedimint Gateway Node {}", self.node.node_id()),
295 },
296 network: self.node.config().network.to_string(),
297 block_height: ldk_block_height,
298 synced_to_chain,
299 })
300 }
301
302 async fn routehints(
303 &self,
304 _num_route_hints: usize,
305 ) -> Result<GetRouteHintsResponse, LightningRpcError> {
306 Ok(GetRouteHintsResponse {
312 route_hints: vec![],
313 })
314 }
315
316 async fn pay(
317 &self,
318 invoice: Bolt11Invoice,
319 max_delay: u64,
320 max_fee: Amount,
321 ) -> Result<PayInvoiceResponse, LightningRpcError> {
322 let payment_id = PaymentId(*invoice.payment_hash().as_byte_array());
323
324 let _payment_lock_guard = self
330 .outbound_lightning_payment_lock_pool
331 .async_lock(payment_id)
332 .await;
333
334 if self.node.payment(&payment_id).is_none() {
341 assert_eq!(
342 self.node
343 .bolt11_payment()
344 .send(
345 &invoice,
346 Some(SendingParameters {
347 max_total_routing_fee_msat: Some(Some(max_fee.msats)),
348 max_total_cltv_expiry_delta: Some(max_delay as u32),
349 max_path_count: None,
350 max_channel_saturation_power_of_half: None,
351 }),
352 )
353 .map_err(|e| LightningRpcError::FailedPayment {
356 failure_reason: format!("LDK payment failed to initialize: {e:?}"),
357 })?,
358 payment_id
359 );
360 }
361
362 loop {
367 if let Some(payment_details) = self.node.payment(&payment_id) {
368 match payment_details.status {
369 PaymentStatus::Pending => {}
370 PaymentStatus::Succeeded => {
371 if let PaymentKind::Bolt11 {
372 preimage: Some(preimage),
373 ..
374 } = payment_details.kind
375 {
376 return Ok(PayInvoiceResponse {
377 preimage: Preimage(preimage.0),
378 });
379 }
380 }
381 PaymentStatus::Failed => {
382 return Err(LightningRpcError::FailedPayment {
383 failure_reason: "LDK payment failed".to_string(),
384 });
385 }
386 }
387 }
388 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
389 }
390 }
391
392 async fn route_htlcs<'a>(
393 mut self: Box<Self>,
394 _task_group: &TaskGroup,
395 ) -> Result<(RouteHtlcStream<'a>, Arc<dyn ILnRpcClient>), LightningRpcError> {
396 let route_htlc_stream = match self.htlc_stream_receiver_or.take() {
397 Some(stream) => Ok(Box::pin(ReceiverStream::new(stream))),
398 None => Err(LightningRpcError::FailedToRouteHtlcs {
399 failure_reason:
400 "Stream does not exist. Likely was already taken by calling `route_htlcs()`."
401 .to_string(),
402 }),
403 }?;
404
405 Ok((route_htlc_stream, Arc::new(*self)))
406 }
407
408 async fn complete_htlc(&self, htlc: InterceptPaymentResponse) -> Result<(), LightningRpcError> {
409 let InterceptPaymentResponse {
410 action,
411 payment_hash,
412 incoming_chan_id: _,
413 htlc_id: _,
414 } = htlc;
415
416 let ph = PaymentHash(*payment_hash.clone().as_byte_array());
417
418 let claimable_amount_msat = 999_999_999_999_999;
424
425 let ph_hex_str = hex::encode(payment_hash);
426
427 if let PaymentAction::Settle(preimage) = action {
428 self.node
429 .bolt11_payment()
430 .claim_for_hash(ph, claimable_amount_msat, PaymentPreimage(preimage.0))
431 .map_err(|_| LightningRpcError::FailedToCompleteHtlc {
432 failure_reason: format!("Failed to claim LDK payment with hash {ph_hex_str}"),
433 })?;
434 } else {
435 error!("Unwinding payment with hash {ph_hex_str} because the action was not `Settle`");
436 self.node.bolt11_payment().fail_for_hash(ph).map_err(|_| {
437 LightningRpcError::FailedToCompleteHtlc {
438 failure_reason: format!("Failed to unwind LDK payment with hash {ph_hex_str}"),
439 }
440 })?;
441 }
442
443 return Ok(());
444 }
445
446 async fn create_invoice(
447 &self,
448 create_invoice_request: CreateInvoiceRequest,
449 ) -> Result<CreateInvoiceResponse, LightningRpcError> {
450 let payment_hash_or = if let Some(payment_hash) = create_invoice_request.payment_hash {
451 let ph = PaymentHash(*payment_hash.as_byte_array());
452 Some(ph)
453 } else {
454 None
455 };
456
457 let description_str = match create_invoice_request.description {
462 Some(InvoiceDescription::Direct(desc)) => desc,
463 _ => String::new(),
464 };
465
466 let invoice = match payment_hash_or {
467 Some(payment_hash) => self.node.bolt11_payment().receive_for_hash(
468 create_invoice_request.amount_msat,
469 description_str.as_str(),
470 create_invoice_request.expiry_secs,
471 payment_hash,
472 ),
473 None => self.node.bolt11_payment().receive(
474 create_invoice_request.amount_msat,
475 description_str.as_str(),
476 create_invoice_request.expiry_secs,
477 ),
478 }
479 .map_err(|e| LightningRpcError::FailedToGetInvoice {
480 failure_reason: e.to_string(),
481 })?;
482
483 Ok(CreateInvoiceResponse {
484 invoice: invoice.to_string(),
485 })
486 }
487
488 async fn get_ln_onchain_address(
489 &self,
490 ) -> Result<GetLnOnchainAddressResponse, LightningRpcError> {
491 self.node
492 .onchain_payment()
493 .new_address()
494 .map(|address| GetLnOnchainAddressResponse {
495 address: address.to_string(),
496 })
497 .map_err(|e| LightningRpcError::FailedToGetLnOnchainAddress {
498 failure_reason: e.to_string(),
499 })
500 }
501
502 async fn send_onchain(
503 &self,
504 SendOnchainRequest {
505 address,
506 amount,
507 fee_rate_sats_per_vbyte: _,
511 }: SendOnchainRequest,
512 ) -> Result<SendOnchainResponse, LightningRpcError> {
513 let onchain = self.node.onchain_payment();
514
515 let txid = match amount {
516 BitcoinAmountOrAll::All => onchain.send_all_to_address(&address.assume_checked()),
517 BitcoinAmountOrAll::Amount(amount_sats) => {
518 onchain.send_to_address(&address.assume_checked(), amount_sats.to_sat())
519 }
520 }
521 .map_err(|e| LightningRpcError::FailedToWithdrawOnchain {
522 failure_reason: e.to_string(),
523 })?;
524
525 Ok(SendOnchainResponse {
526 txid: txid.to_string(),
527 })
528 }
529
530 async fn open_channel(
531 &self,
532 OpenChannelRequest {
533 pubkey,
534 host,
535 channel_size_sats,
536 push_amount_sats,
537 }: OpenChannelRequest,
538 ) -> Result<OpenChannelResponse, LightningRpcError> {
539 let push_amount_msats_or = if push_amount_sats == 0 {
540 None
541 } else {
542 Some(push_amount_sats * 1000)
543 };
544
545 let user_channel_id = self
546 .node
547 .open_announced_channel(
548 pubkey,
549 SocketAddress::from_str(&host).map_err(|e| {
550 LightningRpcError::FailedToConnectToPeer {
551 failure_reason: e.to_string(),
552 }
553 })?,
554 channel_size_sats,
555 push_amount_msats_or,
556 None,
557 )
558 .map_err(|e| LightningRpcError::FailedToOpenChannel {
559 failure_reason: e.to_string(),
560 })?;
561
562 for _ in 0..10 {
564 let funding_txid_or = self
565 .node
566 .list_channels()
567 .iter()
568 .find(|channel| channel.user_channel_id == user_channel_id)
569 .and_then(|channel| channel.funding_txo)
570 .map(|funding_txo| funding_txo.txid);
571
572 if let Some(funding_txid) = funding_txid_or {
573 return Ok(OpenChannelResponse {
574 funding_txid: funding_txid.to_string(),
575 });
576 }
577
578 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
579 }
580
581 Err(LightningRpcError::FailedToOpenChannel {
582 failure_reason: "Channel could not be opened".to_string(),
583 })
584 }
585
586 async fn close_channels_with_peer(
587 &self,
588 CloseChannelsWithPeerRequest { pubkey }: CloseChannelsWithPeerRequest,
589 ) -> Result<CloseChannelsWithPeerResponse, LightningRpcError> {
590 let mut num_channels_closed = 0;
591
592 for channel_with_peer in self
593 .node
594 .list_channels()
595 .iter()
596 .filter(|channel| channel.counterparty_node_id == pubkey)
597 {
598 if self
599 .node
600 .close_channel(&channel_with_peer.user_channel_id, pubkey)
601 .is_ok()
602 {
603 num_channels_closed += 1;
604 }
605 }
606
607 Ok(CloseChannelsWithPeerResponse {
608 num_channels_closed,
609 })
610 }
611
612 async fn list_active_channels(&self) -> Result<ListActiveChannelsResponse, LightningRpcError> {
613 let mut channels = Vec::new();
614
615 for channel_details in self
616 .node
617 .list_channels()
618 .iter()
619 .filter(|channel| channel.is_usable)
620 {
621 channels.push(ChannelInfo {
622 remote_pubkey: channel_details.counterparty_node_id,
623 channel_size_sats: channel_details.channel_value_sats,
624 outbound_liquidity_sats: channel_details.outbound_capacity_msat / 1000,
625 inbound_liquidity_sats: channel_details.inbound_capacity_msat / 1000,
626 short_channel_id: match channel_details.funding_txo {
627 Some(funding_txo) => self.outpoint_to_scid(funding_txo).await.unwrap_or(0),
628 None => 0,
629 },
630 });
631 }
632
633 Ok(ListActiveChannelsResponse { channels })
634 }
635
636 async fn get_balances(&self) -> Result<GetBalancesResponse, LightningRpcError> {
637 let balances = self.node.list_balances();
638 let channel_lists = self
639 .node
640 .list_channels()
641 .into_iter()
642 .filter(|chan| chan.is_usable)
643 .collect::<Vec<_>>();
644 let total_inbound_liquidity_balance_msat: u64 = channel_lists
646 .iter()
647 .map(|channel| channel.inbound_capacity_msat)
648 .sum();
649
650 Ok(GetBalancesResponse {
651 onchain_balance_sats: balances.total_onchain_balance_sats,
652 lightning_balance_msats: balances.total_lightning_balance_sats * 1000,
653 inbound_lightning_liquidity_msats: total_inbound_liquidity_balance_msat,
654 })
655 }
656}