fedimint_client/
api_announcements.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{bail, Context};
6use fedimint_core::config::ClientConfig;
7use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::encoding::{Decodable, Encodable};
9use fedimint_core::net::api_announcement::{override_api_urls, SignedApiAnnouncement};
10use fedimint_core::runtime::sleep;
11use fedimint_core::secp256k1::SECP256K1;
12use fedimint_core::util::{backoff_util, retry, SafeUrl};
13use fedimint_core::{impl_db_lookup, impl_db_record, PeerId};
14use fedimint_logging::LOG_CLIENT;
15use futures::future::join_all;
16use tracing::{debug, warn};
17
18use crate::db::DbKeyPrefix;
19use crate::Client;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28 key = ApiAnnouncementKey,
29 value = SignedApiAnnouncement,
30 db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
31 notify_on_modify = false,
32);
33impl_db_lookup!(
34 key = ApiAnnouncementKey,
35 query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn run_api_announcement_sync(client_inner: Arc<Client>) {
41 let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
43 loop {
44 let results = join_all(client_inner.api.all_peers().iter()
45 .map(|peer_id| async {
46 let peer_id = *peer_id;
47 let announcements =
48
49 retry(
50 "Fetch api announcement (sync)",
51 backoff_util::aggressive_backoff(),
52 || async {
53 client_inner
54 .api
55 .api_announcements(peer_id)
56 .await
57 .with_context(move || format!("Fetching API announcements from peer {peer_id} failed"))
58 },
59 )
60 .await?;
61
62 for (peer_id, announcement) in &announcements {
65 let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
66 bail!("Guardian public key not found for peer {}", peer_id);
67 };
68
69 if !announcement.verify(SECP256K1, guardian_pub_key) {
70 bail!("Failed to verify announcement for peer {}", peer_id);
71 }
72 }
73
74 client_inner
75 .db
76 .autocommit(
77 |dbtx, _|{
78 let announcements_inner = announcements.clone();
79 Box::pin(async move {
80 for (peer, new_announcement) in announcements_inner {
81 let replace_current_announcement = dbtx
82 .get_value(&ApiAnnouncementKey(peer))
83 .await
84 .map_or(true, |current_announcement| {
85 current_announcement.api_announcement.nonce
86 < new_announcement.api_announcement.nonce
87 });
88 if replace_current_announcement {
89 debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
90 dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
91 .await;
92 }
93 }
94
95 Result::<(), ()>::Ok(())
96 })},
97 None,
98 )
99 .await
100 .expect("Will never return an error");
101
102 Ok(())
103 })).await;
104
105 for (peer_id, result) in guardian_pub_keys.keys().zip(results) {
106 if let Err(e) = result {
107 warn!(target: LOG_CLIENT, %peer_id, ?e, "Failed to process API announcements");
108 }
109 }
110
111 sleep(Duration::from_secs(3600)).await;
113 }
114}
115
116pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
120 override_api_urls(
121 db,
122 cfg.global
123 .api_endpoints
124 .iter()
125 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
126 &ApiAnnouncementPrefix,
127 |key| key.0,
128 )
129 .await
130}