fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::net::Connector;
5use fedimint_api_client::api::DynGlobalApi;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::net::api_announcement::{
9    override_api_urls, ApiAnnouncement, SignedApiAnnouncement,
10};
11use fedimint_core::task::{sleep, TaskGroup};
12use fedimint_core::util::SafeUrl;
13use fedimint_core::{impl_db_lookup, impl_db_record, secp256k1, PeerId};
14use tokio::select;
15use tracing::debug;
16
17use crate::config::{ServerConfig, ServerConfigConsensus};
18use crate::consensus::db::DbKeyPrefix;
19
20#[derive(Clone, Debug, Encodable, Decodable)]
21pub struct ApiAnnouncementKey(pub PeerId);
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct ApiAnnouncementPrefix;
25
26impl_db_record!(
27    key = ApiAnnouncementKey,
28    value = SignedApiAnnouncement,
29    db_prefix = DbKeyPrefix::ApiAnnouncements,
30    notify_on_modify = true,
31);
32impl_db_lookup!(
33    key = ApiAnnouncementKey,
34    query_prefix = ApiAnnouncementPrefix
35);
36
37pub async fn start_api_announcement_service(
38    db: &Database,
39    tg: &TaskGroup,
40    cfg: &ServerConfig,
41    api_secret: Option<String>,
42) {
43    const INITIAL_DEALY_SECONDS: u64 = 5;
44    const FAILURE_RETRY_SECONDS: u64 = 60;
45    const SUCCESS_RETRY_SECONDS: u64 = 600;
46
47    insert_signed_api_announcement_if_not_present(db, cfg).await;
48
49    let db = db.clone();
50    // FIXME: (@leonardo) how should we handle the connector here ?
51    let api_client = DynGlobalApi::from_endpoints(
52        get_api_urls(&db, &cfg.consensus).await,
53        &api_secret,
54        &Connector::default(),
55    );
56    let our_peer_id = cfg.local.identity;
57    tg.spawn_cancellable("submit-api-url-announcement", async move {
58        // Give other servers some time to start up in case they were just restarted
59        // together
60        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
61        loop {
62            let announcement = db.begin_transaction_nc()
63                .await
64                .get_value(&ApiAnnouncementKey(our_peer_id))
65                .await
66                .expect("Our own API announcement should be present in the database");
67
68            if let Err(e) = api_client
69                .submit_api_announcement(our_peer_id, announcement.clone())
70                .await
71            {
72                debug!(?e, "Announcing our API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
73                sleep(Duration::from_secs(FAILURE_RETRY_SECONDS)).await;
74            } else {
75                let our_announcement_key = ApiAnnouncementKey(our_peer_id);
76                let new_announcement = db.wait_key_check(
77                    &our_announcement_key,
78                    |new_announcement| {
79                        new_announcement.and_then(
80                            |new_announcement| (new_announcement.api_announcement.nonce != announcement.api_announcement.nonce).then_some(())
81                        )
82                    });
83
84                select! {
85                    _ = new_announcement => {},
86                    () = sleep(Duration::from_secs(SUCCESS_RETRY_SECONDS)) => {},
87                }
88            }
89        }
90    });
91}
92
93/// Checks if we already have a signed API endpoint announcement for our own
94/// identity in the database and creates one if not.
95async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
96    let mut dbtx = db.begin_transaction().await;
97    if dbtx
98        .get_value(&ApiAnnouncementKey(cfg.local.identity))
99        .await
100        .is_some()
101    {
102        return;
103    }
104
105    let api_announcement = ApiAnnouncement::new(
106        cfg.consensus.api_endpoints[&cfg.local.identity].url.clone(),
107        0,
108    );
109    let ctx = secp256k1::Secp256k1::new();
110    let signed_announcement =
111        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
112
113    dbtx.insert_entry(
114        &ApiAnnouncementKey(cfg.local.identity),
115        &signed_announcement,
116    )
117    .await;
118    dbtx.commit_tx().await;
119}
120
121/// Returns a list of all peers and their respective API URLs taking into
122/// account announcements overwriting the URLs contained in the original
123/// configuration.
124pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
125    override_api_urls(
126        db,
127        cfg.api_endpoints
128            .iter()
129            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
130        &ApiAnnouncementPrefix,
131        |key| key.0,
132    )
133    .await
134}