fedimint_server/net/api/
announcement.rs1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::net::Connector;
5use fedimint_api_client::api::DynGlobalApi;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::net::api_announcement::{
9 override_api_urls, ApiAnnouncement, SignedApiAnnouncement,
10};
11use fedimint_core::task::{sleep, TaskGroup};
12use fedimint_core::util::SafeUrl;
13use fedimint_core::{impl_db_lookup, impl_db_record, secp256k1, PeerId};
14use tokio::select;
15use tracing::debug;
16
17use crate::config::{ServerConfig, ServerConfigConsensus};
18use crate::consensus::db::DbKeyPrefix;
19
20#[derive(Clone, Debug, Encodable, Decodable)]
21pub struct ApiAnnouncementKey(pub PeerId);
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct ApiAnnouncementPrefix;
25
26impl_db_record!(
27 key = ApiAnnouncementKey,
28 value = SignedApiAnnouncement,
29 db_prefix = DbKeyPrefix::ApiAnnouncements,
30 notify_on_modify = true,
31);
32impl_db_lookup!(
33 key = ApiAnnouncementKey,
34 query_prefix = ApiAnnouncementPrefix
35);
36
37pub async fn start_api_announcement_service(
38 db: &Database,
39 tg: &TaskGroup,
40 cfg: &ServerConfig,
41 api_secret: Option<String>,
42) {
43 const INITIAL_DEALY_SECONDS: u64 = 5;
44 const FAILURE_RETRY_SECONDS: u64 = 60;
45 const SUCCESS_RETRY_SECONDS: u64 = 600;
46
47 insert_signed_api_announcement_if_not_present(db, cfg).await;
48
49 let db = db.clone();
50 let api_client = DynGlobalApi::from_endpoints(
52 get_api_urls(&db, &cfg.consensus).await,
53 &api_secret,
54 &Connector::default(),
55 );
56 let our_peer_id = cfg.local.identity;
57 tg.spawn_cancellable("submit-api-url-announcement", async move {
58 sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
61 loop {
62 let announcement = db.begin_transaction_nc()
63 .await
64 .get_value(&ApiAnnouncementKey(our_peer_id))
65 .await
66 .expect("Our own API announcement should be present in the database");
67
68 if let Err(e) = api_client
69 .submit_api_announcement(our_peer_id, announcement.clone())
70 .await
71 {
72 debug!(?e, "Announcing our API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
73 sleep(Duration::from_secs(FAILURE_RETRY_SECONDS)).await;
74 } else {
75 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
76 let new_announcement = db.wait_key_check(
77 &our_announcement_key,
78 |new_announcement| {
79 new_announcement.and_then(
80 |new_announcement| (new_announcement.api_announcement.nonce != announcement.api_announcement.nonce).then_some(())
81 )
82 });
83
84 select! {
85 _ = new_announcement => {},
86 () = sleep(Duration::from_secs(SUCCESS_RETRY_SECONDS)) => {},
87 }
88 }
89 }
90 });
91}
92
93async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
96 let mut dbtx = db.begin_transaction().await;
97 if dbtx
98 .get_value(&ApiAnnouncementKey(cfg.local.identity))
99 .await
100 .is_some()
101 {
102 return;
103 }
104
105 let api_announcement = ApiAnnouncement::new(
106 cfg.consensus.api_endpoints[&cfg.local.identity].url.clone(),
107 0,
108 );
109 let ctx = secp256k1::Secp256k1::new();
110 let signed_announcement =
111 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
112
113 dbtx.insert_entry(
114 &ApiAnnouncementKey(cfg.local.identity),
115 &signed_announcement,
116 )
117 .await;
118 dbtx.commit_tx().await;
119}
120
121pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
125 override_api_urls(
126 db,
127 cfg.api_endpoints
128 .iter()
129 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
130 &ApiAnnouncementPrefix,
131 |key| key.0,
132 )
133 .await
134}