fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::BTreeMap;
8use std::time::Duration;
9
10use anyhow::{anyhow, ensure, format_err, Context};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_bitcoind::create_bitcoind;
13use fedimint_core::bitcoin::hashes::sha256;
14use fedimint_core::config::{
15    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
16    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
17};
18use fedimint_core::core::ModuleInstanceId;
19use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
20use fedimint_core::encoding::Encodable;
21use fedimint_core::module::audit::Audit;
22use fedimint_core::module::{
23    api_endpoint, ApiEndpoint, ApiError, ApiVersion, CoreConsensusVersion, InputMeta,
24    ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
25    TransactionItemAmount, CORE_CONSENSUS_VERSION,
26};
27use fedimint_core::task::{timeout, TaskGroup};
28use fedimint_core::time::duration_since_epoch;
29use fedimint_core::util::SafeUrl;
30use fedimint_core::{
31    apply, async_trait_maybe_send, push_db_pair_items, BitcoinHash, InPoint, NumPeers, NumPeersExt,
32    OutPoint, PeerId,
33};
34use fedimint_lnv2_common::config::{
35    LightningClientConfig, LightningConfig, LightningConfigConsensus, LightningConfigLocal,
36    LightningConfigPrivate, LightningGenParams,
37};
38use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
39use fedimint_lnv2_common::endpoint_constants::{
40    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_PREIMAGE_ENDPOINT,
41    CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT, GATEWAYS_ENDPOINT,
42    OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
43};
44use fedimint_lnv2_common::{
45    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
46    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
47    LightningOutputOutcome, LightningOutputV0, OutgoingWitness, MODULE_CONSENSUS_VERSION,
48};
49use fedimint_logging::LOG_MODULE_LNV2;
50use fedimint_server::config::distributedgen::{eval_poly_g1, PeerHandleOps};
51use fedimint_server::core::{
52    DynServerModule, ServerModule, ServerModuleInit, ServerModuleInitArgs,
53};
54use fedimint_server::net::api::check_auth;
55use futures::StreamExt;
56use group::ff::Field;
57use group::Curve;
58use rand::SeedableRng;
59use rand_chacha::ChaChaRng;
60use strum::IntoEnumIterator;
61use tokio::sync::watch;
62use tpe::{
63    derive_pk_share, AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare,
64};
65use tracing::trace;
66
67use crate::db::{
68    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
69    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractKey,
70    IncomingContractPrefix, OutgoingContractKey, OutgoingContractPrefix, PreimageKey,
71    PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
72};
73
74#[derive(Debug, Clone)]
75pub struct LightningInit;
76
77impl ModuleInit for LightningInit {
78    type Common = LightningCommonInit;
79
80    async fn dump_database(
81        &self,
82        dbtx: &mut DatabaseTransaction<'_>,
83        prefix_names: Vec<String>,
84    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
85        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
86            BTreeMap::new();
87
88        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
89            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
90        });
91
92        for table in filtered_prefixes {
93            match table {
94                DbKeyPrefix::BlockCountVote => {
95                    push_db_pair_items!(
96                        dbtx,
97                        BlockCountVotePrefix,
98                        BlockCountVoteKey,
99                        u64,
100                        lightning,
101                        "Lightning Block Count Votes"
102                    );
103                }
104                DbKeyPrefix::UnixTimeVote => {
105                    push_db_pair_items!(
106                        dbtx,
107                        UnixTimeVotePrefix,
108                        UnixTimeVoteKey,
109                        u64,
110                        lightning,
111                        "Lightning Unix Time Votes"
112                    );
113                }
114                DbKeyPrefix::OutgoingContract => {
115                    push_db_pair_items!(
116                        dbtx,
117                        OutgoingContractPrefix,
118                        LightningOutgoingContractKey,
119                        OutgoingContract,
120                        lightning,
121                        "Lightning Outgoing Contracts"
122                    );
123                }
124                DbKeyPrefix::IncomingContract => {
125                    push_db_pair_items!(
126                        dbtx,
127                        IncomingContractPrefix,
128                        LightningIncomingContractKey,
129                        IncomingContract,
130                        lightning,
131                        "Lightning Incoming Contracts"
132                    );
133                }
134                DbKeyPrefix::DecryptionKeyShare => {
135                    push_db_pair_items!(
136                        dbtx,
137                        DecryptionKeySharePrefix,
138                        DecryptionKeyShareKey,
139                        DecryptionKeyShare,
140                        lightning,
141                        "Lightning Decryption Key Share"
142                    );
143                }
144                DbKeyPrefix::Preimage => {
145                    push_db_pair_items!(
146                        dbtx,
147                        PreimagePrefix,
148                        LightningPreimageKey,
149                        [u8; 32],
150                        lightning,
151                        "Lightning Preimages"
152                    );
153                }
154                DbKeyPrefix::Gateway => {
155                    push_db_pair_items!(
156                        dbtx,
157                        GatewayPrefix,
158                        GatewayKey,
159                        (),
160                        lightning,
161                        "Lightning Gateways"
162                    );
163                }
164            }
165        }
166
167        Box::new(lightning.into_iter())
168    }
169}
170
171#[apply(async_trait_maybe_send!)]
172impl ServerModuleInit for LightningInit {
173    type Params = LightningGenParams;
174
175    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
176        &[MODULE_CONSENSUS_VERSION]
177    }
178
179    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
180        SupportedModuleApiVersions::from_raw(
181            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
182            (
183                MODULE_CONSENSUS_VERSION.major,
184                MODULE_CONSENSUS_VERSION.minor,
185            ),
186            &[(0, 0)],
187        )
188    }
189
190    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<DynServerModule> {
191        Ok(Lightning::new(args.cfg().to_typed()?, args.task_group())?.into())
192    }
193
194    fn trusted_dealer_gen(
195        &self,
196        peers: &[PeerId],
197        params: &ConfigGenModuleParams,
198    ) -> BTreeMap<PeerId, ServerModuleConfig> {
199        let params = self
200            .parse_params(params)
201            .expect("Failed tp parse lnv2 config gen params");
202
203        let tpe_pks = peers
204            .iter()
205            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
206            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
207
208        peers
209            .iter()
210            .map(|peer| {
211                let cfg = LightningConfig {
212                    local: LightningConfigLocal {
213                        bitcoin_rpc: params.local.bitcoin_rpc.clone(),
214                    },
215                    consensus: LightningConfigConsensus {
216                        tpe_agg_pk: dealer_agg_pk(),
217                        tpe_pks: tpe_pks.clone(),
218                        fee_consensus: params.consensus.fee_consensus.clone(),
219                        network: params.consensus.network,
220                    },
221                    private: LightningConfigPrivate {
222                        sk: dealer_sk(peers.to_num_peers(), *peer),
223                    },
224                };
225
226                (*peer, cfg.to_erased())
227            })
228            .collect()
229    }
230
231    async fn distributed_gen(
232        &self,
233        peers: &PeerHandle,
234        params: &ConfigGenModuleParams,
235    ) -> anyhow::Result<ServerModuleConfig> {
236        let params = self.parse_params(params).unwrap();
237        let (polynomial, sks) = peers.run_dkg_g1().await?;
238
239        let server = LightningConfig {
240            local: LightningConfigLocal {
241                bitcoin_rpc: params.local.bitcoin_rpc.clone(),
242            },
243            consensus: LightningConfigConsensus {
244                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
245                tpe_pks: peers
246                    .num_peers()
247                    .peer_ids()
248                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
249                    .collect(),
250                fee_consensus: params.consensus.fee_consensus.clone(),
251                network: params.consensus.network,
252            },
253            private: LightningConfigPrivate {
254                sk: SecretKeyShare(sks),
255            },
256        };
257
258        Ok(server.to_erased())
259    }
260
261    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
262        let config = config.to_typed::<LightningConfig>()?;
263
264        ensure!(
265            tpe::derive_pk_share(&config.private.sk)
266                == *config
267                    .consensus
268                    .tpe_pks
269                    .get(identity)
270                    .context("Public key set has no key for our identity")?,
271            "Preimge encryption secret key share does not match our public key share"
272        );
273
274        Ok(())
275    }
276
277    fn get_client_config(
278        &self,
279        config: &ServerModuleConsensusConfig,
280    ) -> anyhow::Result<LightningClientConfig> {
281        let config = LightningConfigConsensus::from_erased(config)?;
282        Ok(LightningClientConfig {
283            tpe_agg_pk: config.tpe_agg_pk,
284            tpe_pks: config.tpe_pks,
285            fee_consensus: config.fee_consensus,
286            network: config.network,
287        })
288    }
289}
290
291fn dealer_agg_pk() -> AggregatePublicKey {
292    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
293}
294
295fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
296    derive_pk_share(&dealer_sk(num_peers, peer))
297}
298
299fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
300    let x = Scalar::from(peer.to_usize() as u64 + 1);
301
302    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
303    // using the Horner schema.
304
305    let y = (0..num_peers.threshold())
306        .map(|index| coefficient(index as u64))
307        .rev()
308        .reduce(|accumulator, c| accumulator * x + c)
309        .expect("We have at least one coefficient");
310
311    SecretKeyShare(y)
312}
313
314fn coefficient(index: u64) -> Scalar {
315    Scalar::random(&mut ChaChaRng::from_seed(
316        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
317    ))
318}
319
320#[derive(Debug)]
321pub struct Lightning {
322    cfg: LightningConfig,
323    /// Block count updated periodically by a background task
324    block_count_rx: watch::Receiver<Option<u64>>,
325}
326
327#[apply(async_trait_maybe_send!)]
328impl ServerModule for Lightning {
329    type Common = LightningModuleTypes;
330    type Init = LightningInit;
331
332    async fn consensus_proposal(
333        &self,
334        _dbtx: &mut DatabaseTransaction<'_>,
335    ) -> Vec<LightningConsensusItem> {
336        let mut items = vec![LightningConsensusItem::UnixTimeVote(
337            duration_since_epoch().as_secs(),
338        )];
339
340        if let Ok(block_count) = self.get_block_count() {
341            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
342            items.push(LightningConsensusItem::BlockCountVote(block_count));
343        }
344
345        items
346    }
347
348    async fn process_consensus_item<'a, 'b>(
349        &'a self,
350        dbtx: &mut DatabaseTransaction<'b>,
351        consensus_item: LightningConsensusItem,
352        peer: PeerId,
353    ) -> anyhow::Result<()> {
354        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
355        match consensus_item {
356            LightningConsensusItem::BlockCountVote(vote) => {
357                let current_vote = dbtx
358                    .insert_entry(&BlockCountVoteKey(peer), &vote)
359                    .await
360                    .unwrap_or(0);
361
362                ensure!(current_vote < vote, "Block count vote is redundant");
363
364                Ok(())
365            }
366            LightningConsensusItem::UnixTimeVote(vote) => {
367                let current_vote = dbtx
368                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
369                    .await
370                    .unwrap_or(0);
371
372                ensure!(current_vote < vote, "Unix time vote is redundant");
373
374                Ok(())
375            }
376            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
377                "Received lnv2 consensus item with unknown variant {variant}"
378            )),
379        }
380    }
381
382    async fn process_input<'a, 'b, 'c>(
383        &'a self,
384        dbtx: &mut DatabaseTransaction<'c>,
385        input: &'b LightningInput,
386        _in_point: InPoint,
387    ) -> Result<InputMeta, LightningInputError> {
388        let (pub_key, amount) = match input.ensure_v0_ref()? {
389            LightningInputV0::Outgoing(contract_id, outgoing_witness) => {
390                let contract = dbtx
391                    .remove_entry(&OutgoingContractKey(*contract_id))
392                    .await
393                    .ok_or(LightningInputError::UnknownContract)?;
394
395                let pub_key = match outgoing_witness {
396                    OutgoingWitness::Claim(preimage) => {
397                        if contract.expiration <= self.consensus_block_count(dbtx).await {
398                            return Err(LightningInputError::Expired);
399                        }
400
401                        if !contract.verify_preimage(preimage) {
402                            return Err(LightningInputError::InvalidPreimage);
403                        }
404
405                        dbtx.insert_entry(&PreimageKey(*contract_id), preimage)
406                            .await;
407
408                        contract.claim_pk
409                    }
410                    OutgoingWitness::Refund => {
411                        if contract.expiration > self.consensus_block_count(dbtx).await {
412                            return Err(LightningInputError::NotExpired);
413                        }
414
415                        contract.refund_pk
416                    }
417                    OutgoingWitness::Cancel(forfeit_signature) => {
418                        if !contract.verify_forfeit_signature(forfeit_signature) {
419                            return Err(LightningInputError::InvalidForfeitSignature);
420                        }
421
422                        contract.refund_pk
423                    }
424                };
425
426                (pub_key, contract.amount)
427            }
428            LightningInputV0::Incoming(contract_id, agg_decryption_key) => {
429                let contract = dbtx
430                    .remove_entry(&IncomingContractKey(*contract_id))
431                    .await
432                    .ok_or(LightningInputError::UnknownContract)?;
433
434                if !contract
435                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
436                {
437                    return Err(LightningInputError::InvalidDecryptionKey);
438                }
439
440                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
441                    Some(..) => contract.commitment.claim_pk,
442                    None => contract.commitment.refund_pk,
443                };
444
445                (pub_key, contract.commitment.amount)
446            }
447        };
448
449        Ok(InputMeta {
450            amount: TransactionItemAmount {
451                amount,
452                fee: self.cfg.consensus.fee_consensus.fee(amount),
453            },
454            pub_key,
455        })
456    }
457
458    async fn process_output<'a, 'b>(
459        &'a self,
460        dbtx: &mut DatabaseTransaction<'b>,
461        output: &'a LightningOutput,
462        _outpoint: OutPoint,
463    ) -> Result<TransactionItemAmount, LightningOutputError> {
464        let amount = match output.ensure_v0_ref()? {
465            LightningOutputV0::Outgoing(contract) => {
466                if dbtx
467                    .insert_entry(&OutgoingContractKey(contract.contract_id()), contract)
468                    .await
469                    .is_some()
470                {
471                    return Err(LightningOutputError::ContractAlreadyExists);
472                }
473
474                contract.amount
475            }
476            LightningOutputV0::Incoming(contract) => {
477                if !contract.verify() {
478                    return Err(LightningOutputError::InvalidContract);
479                }
480
481                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
482                    return Err(LightningOutputError::ContractExpired);
483                }
484
485                if dbtx
486                    .insert_entry(&IncomingContractKey(contract.contract_id()), contract)
487                    .await
488                    .is_some()
489                {
490                    return Err(LightningOutputError::ContractAlreadyExists);
491                }
492
493                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
494
495                dbtx.insert_entry(&DecryptionKeyShareKey(contract.contract_id()), &dk_share)
496                    .await;
497
498                contract.commitment.amount
499            }
500        };
501
502        Ok(TransactionItemAmount {
503            amount,
504            fee: self.cfg.consensus.fee_consensus.fee(amount),
505        })
506    }
507
508    async fn output_status(
509        &self,
510        _dbtx: &mut DatabaseTransaction<'_>,
511        _out_point: OutPoint,
512    ) -> Option<LightningOutputOutcome> {
513        None
514    }
515
516    async fn audit(
517        &self,
518        dbtx: &mut DatabaseTransaction<'_>,
519        audit: &mut Audit,
520        module_instance_id: ModuleInstanceId,
521    ) {
522        // Both incoming and outgoing contracts represent liabilities to the federation
523        // since they are obligations to issue notes.
524        audit
525            .add_items(
526                dbtx,
527                module_instance_id,
528                &OutgoingContractPrefix,
529                |_, contract| -(contract.amount.msats as i64),
530            )
531            .await;
532
533        audit
534            .add_items(
535                dbtx,
536                module_instance_id,
537                &IncomingContractPrefix,
538                |_, contract| -(contract.commitment.amount.msats as i64),
539            )
540            .await;
541    }
542
543    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
544        vec![
545            api_endpoint! {
546                CONSENSUS_BLOCK_COUNT_ENDPOINT,
547                ApiVersion::new(0, 0),
548                async |module: &Lightning, context, _params : () | -> u64 {
549                    let db = context.db();
550                    let mut dbtx = db.begin_transaction_nc().await;
551
552                    Ok(module.consensus_block_count(&mut dbtx).await)
553                }
554            },
555            api_endpoint! {
556                AWAIT_INCOMING_CONTRACT_ENDPOINT,
557                ApiVersion::new(0, 0),
558                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<ContractId> {
559                    let db = context.db();
560
561                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
562                }
563            },
564            api_endpoint! {
565                AWAIT_PREIMAGE_ENDPOINT,
566                ApiVersion::new(0, 0),
567                async |module: &Lightning, context, params: (ContractId, u64)| -> Option<[u8; 32]> {
568                    let db = context.db();
569
570                    Ok(module.await_preimage(db, params.0, params.1).await)
571                }
572            },
573            api_endpoint! {
574                DECRYPTION_KEY_SHARE_ENDPOINT,
575                ApiVersion::new(0, 0),
576                async |_module: &Lightning, context, params: ContractId| -> DecryptionKeyShare {
577                    let share = context
578                        .db()
579                        .begin_transaction_nc()
580                        .await
581                        .get_value(&DecryptionKeyShareKey(params))
582                        .await
583                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
584
585                    Ok(share)
586                }
587            },
588            api_endpoint! {
589                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
590                ApiVersion::new(0, 0),
591                async |module: &Lightning, context, contract_id: ContractId| -> Option<u64> {
592                    let db = context.db();
593
594                    Ok(module.outgoing_contract_expiration(db, contract_id).await)
595                }
596            },
597            api_endpoint! {
598                ADD_GATEWAY_ENDPOINT,
599                ApiVersion::new(0, 0),
600                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
601                    check_auth(context)?;
602
603                    let db = context.db();
604
605                    Ok(Lightning::add_gateway(db, gateway).await)
606                }
607            },
608            api_endpoint! {
609                REMOVE_GATEWAY_ENDPOINT,
610                ApiVersion::new(0, 0),
611                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
612                    check_auth(context)?;
613
614                    let db = context.db();
615
616                    Ok(Lightning::remove_gateway(db, gateway).await)
617                }
618            },
619            api_endpoint! {
620                GATEWAYS_ENDPOINT,
621                ApiVersion::new(0, 0),
622                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
623                    let db = context.db();
624
625                    Ok(Lightning::gateways(db).await)
626                }
627            },
628        ]
629    }
630}
631
632impl Lightning {
633    fn new(cfg: LightningConfig, task_group: &TaskGroup) -> anyhow::Result<Self> {
634        let btc_rpc = create_bitcoind(&cfg.local.bitcoin_rpc)?;
635        let block_count_rx = btc_rpc.spawn_block_count_update_task(task_group)?;
636
637        Ok(Lightning {
638            cfg,
639            block_count_rx,
640        })
641    }
642
643    fn get_block_count(&self) -> anyhow::Result<u64> {
644        self.block_count_rx
645            .borrow()
646            .ok_or_else(|| format_err!("Block count not available yet"))
647    }
648
649    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
650        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
651
652        let mut counts = dbtx
653            .find_by_prefix(&BlockCountVotePrefix)
654            .await
655            .map(|entry| entry.1)
656            .collect::<Vec<u64>>()
657            .await;
658
659        counts.sort_unstable();
660
661        counts.reverse();
662
663        assert!(counts.last() <= counts.first());
664
665        // The block count we select guarantees that any threshold of correct peers can
666        // increase the consensus block count and any consensus block count has been
667        // confirmed by a threshold of peers.
668
669        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
670    }
671
672    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
673        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
674
675        let mut times = dbtx
676            .find_by_prefix(&UnixTimeVotePrefix)
677            .await
678            .map(|entry| entry.1)
679            .collect::<Vec<u64>>()
680            .await;
681
682        times.sort_unstable();
683
684        times.reverse();
685
686        assert!(times.last() <= times.first());
687
688        // The unix time we select guarantees that any threshold of correct peers can
689        // advance the consensus unix time and any consensus unix time has been
690        // confirmed by a threshold of peers.
691
692        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
693    }
694
695    async fn await_incoming_contract(
696        &self,
697        db: Database,
698        contract_id: ContractId,
699        expiration: u64,
700    ) -> Option<ContractId> {
701        loop {
702            timeout(
703                Duration::from_secs(10),
704                db.wait_key_exists(&IncomingContractKey(contract_id)),
705            )
706            .await
707            .ok();
708
709            // to avoid race conditions we have to check for the contract and
710            // its expiration in the same database transaction
711            let mut dbtx = db.begin_transaction_nc().await;
712
713            if let Some(contract) = dbtx.get_value(&IncomingContractKey(contract_id)).await {
714                return Some(contract.contract_id());
715            }
716
717            if expiration <= self.consensus_unix_time(&mut dbtx).await {
718                return None;
719            }
720        }
721    }
722
723    async fn await_preimage(
724        &self,
725        db: Database,
726        contract_id: ContractId,
727        expiration: u64,
728    ) -> Option<[u8; 32]> {
729        loop {
730            timeout(
731                Duration::from_secs(10),
732                db.wait_key_exists(&PreimageKey(contract_id)),
733            )
734            .await
735            .ok();
736
737            // to avoid race conditions we have to check for the preimage and
738            // the contracts expiration in the same database transaction
739            let mut dbtx = db.begin_transaction_nc().await;
740
741            if let Some(preimage) = dbtx.get_value(&PreimageKey(contract_id)).await {
742                return Some(preimage);
743            }
744
745            if expiration <= self.consensus_block_count(&mut dbtx).await {
746                return None;
747            }
748        }
749    }
750
751    async fn outgoing_contract_expiration(
752        &self,
753        db: Database,
754        contract_id: ContractId,
755    ) -> Option<u64> {
756        let mut dbtx = db.begin_transaction_nc().await;
757
758        let contract = dbtx.get_value(&OutgoingContractKey(contract_id)).await?;
759
760        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
761
762        Some(contract.expiration.saturating_sub(consensus_block_count))
763    }
764
765    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
766        let mut dbtx = db.begin_transaction().await;
767
768        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
769
770        dbtx.commit_tx().await;
771
772        is_new_entry
773    }
774
775    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
776        let mut dbtx = db.begin_transaction().await;
777
778        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
779
780        dbtx.commit_tx().await;
781
782        entry_existed
783    }
784
785    async fn gateways(db: Database) -> Vec<SafeUrl> {
786        db.begin_transaction_nc()
787            .await
788            .find_by_prefix(&GatewayPrefix)
789            .await
790            .map(|entry| entry.0 .0)
791            .collect()
792            .await
793    }
794}