fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::net::Connector;
5use fedimint_api_client::api::DynGlobalApi;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::net::api_announcement::{
9    override_api_urls, ApiAnnouncement, SignedApiAnnouncement,
10};
11use fedimint_core::task::{sleep, TaskGroup};
12use fedimint_core::util::SafeUrl;
13use fedimint_core::{impl_db_lookup, impl_db_record, secp256k1, PeerId};
14use fedimint_logging::LOG_NET_API;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::consensus::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28    key = ApiAnnouncementKey,
29    value = SignedApiAnnouncement,
30    db_prefix = DbKeyPrefix::ApiAnnouncements,
31    notify_on_modify = true,
32);
33impl_db_lookup!(
34    key = ApiAnnouncementKey,
35    query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39    db: &Database,
40    tg: &TaskGroup,
41    cfg: &ServerConfig,
42    api_secret: Option<String>,
43) {
44    const INITIAL_DEALY_SECONDS: u64 = 5;
45    const FAILURE_RETRY_SECONDS: u64 = 60;
46    const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48    insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50    let db = db.clone();
51    // FIXME: (@leonardo) how should we handle the connector here ?
52    let api_client = DynGlobalApi::from_endpoints(
53        get_api_urls(&db, &cfg.consensus).await,
54        &api_secret,
55        &Connector::default(),
56    );
57    let our_peer_id = cfg.local.identity;
58    tg.spawn_cancellable("submit-api-url-announcement", async move {
59        // Give other servers some time to start up in case they were just restarted
60        // together
61        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
62        loop {
63            let announcement = db.begin_transaction_nc()
64                .await
65                .get_value(&ApiAnnouncementKey(our_peer_id))
66                .await
67                .expect("Our own API announcement should be present in the database");
68
69            if let Err(e) = api_client
70                .submit_api_announcement(our_peer_id, announcement.clone())
71                .await
72            {
73                debug!(target: LOG_NET_API, ?e, "Announcing our API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
74                sleep(Duration::from_secs(FAILURE_RETRY_SECONDS)).await;
75            } else {
76                let our_announcement_key = ApiAnnouncementKey(our_peer_id);
77                let new_announcement = db.wait_key_check(
78                    &our_announcement_key,
79                    |new_announcement| {
80                        new_announcement.and_then(
81                            |new_announcement| (new_announcement.api_announcement.nonce != announcement.api_announcement.nonce).then_some(())
82                        )
83                    });
84
85                select! {
86                    _ = new_announcement => {},
87                    () = sleep(Duration::from_secs(SUCCESS_RETRY_SECONDS)) => {},
88                }
89            }
90        }
91    });
92}
93
94/// Checks if we already have a signed API endpoint announcement for our own
95/// identity in the database and creates one if not.
96async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
97    let mut dbtx = db.begin_transaction().await;
98    if dbtx
99        .get_value(&ApiAnnouncementKey(cfg.local.identity))
100        .await
101        .is_some()
102    {
103        return;
104    }
105
106    let api_announcement = ApiAnnouncement::new(
107        cfg.consensus.api_endpoints[&cfg.local.identity].url.clone(),
108        0,
109    );
110    let ctx = secp256k1::Secp256k1::new();
111    let signed_announcement =
112        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
113
114    dbtx.insert_entry(
115        &ApiAnnouncementKey(cfg.local.identity),
116        &signed_announcement,
117    )
118    .await;
119    dbtx.commit_tx().await;
120}
121
122/// Returns a list of all peers and their respective API URLs taking into
123/// account announcements overwriting the URLs contained in the original
124/// configuration.
125pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
126    override_api_urls(
127        db,
128        cfg.api_endpoints
129            .iter()
130            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
131        &ApiAnnouncementPrefix,
132        |key| key.0,
133    )
134    .await
135}