1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10 ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::backup::SignedBackupRequest;
14use fedimint_core::core::ModuleInstanceId;
15use fedimint_core::endpoint_constants::{
16 ADD_CONFIG_GEN_PEER_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
17 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
18 BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT, CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
19 DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
20 GUARDIAN_CONFIG_BACKUP_ENDPOINT, RECOVER_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
21 RUN_DKG_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT,
22 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_CONFIG_GEN_CONNECTIONS_ENDPOINT,
23 SET_CONFIG_GEN_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SHUTDOWN_ENDPOINT,
24 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, STATUS_ENDPOINT,
25 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26 VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::module::audit::AuditSummary;
29use fedimint_core::module::registry::ModuleDecoderRegistry;
30use fedimint_core::module::{
31 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
32};
33use fedimint_core::net::api_announcement::{
34 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
35};
36use fedimint_core::session_outcome::{
37 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
38};
39use fedimint_core::task::{MaybeSend, MaybeSync};
40use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
41use fedimint_core::util::SafeUrl;
42use fedimint_core::{apply, async_trait_maybe_send, NumPeersExt, PeerId, TransactionId};
43use fedimint_logging::LOG_CLIENT_NET_API;
44use futures::future::join_all;
45use itertools::Itertools;
46use rand::seq::SliceRandom;
47use serde_json::Value;
48use tokio::sync::OnceCell;
49use tracing::debug;
50
51use super::{
52 DynModuleApi, FederationApiExt, FederationError, FederationResult, GuardianConfigBackup,
53 IGlobalFederationApi, IRawFederationApi, PeerResult, StatusResponse,
54};
55use crate::api::VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2;
56use crate::query::FilterMapThreshold;
57
58pub trait GlobalFederationApiWithCacheExt
61where
62 Self: Sized,
63{
64 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72 GlobalFederationApiWithCache::new(self)
73 }
74}
75
76#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82 inner: T,
83 await_session_lru: Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
93
94 get_session_status_lru:
102 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
103}
104
105impl<T> GlobalFederationApiWithCache<T> {
106 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
107 Self {
108 inner,
109 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
110 NonZeroUsize::new(512).expect("is non-zero"),
111 ))),
112 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
113 NonZeroUsize::new(512).expect("is non-zero"),
114 ))),
115 }
116 }
117}
118
119impl<T> GlobalFederationApiWithCache<T>
120where
121 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
122{
123 async fn await_block_raw(
124 &self,
125 block_index: u64,
126 decoders: &ModuleDecoderRegistry,
127 ) -> anyhow::Result<SessionOutcome> {
128 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
129 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
130 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
131 ApiRequestErased::new(block_index),
132 )
133 .await?
134 .try_into_inner(decoders)
135 .map_err(|e| anyhow!(e.to_string()))
136 }
137
138 fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
139 let mut peers = self.all_peers().iter().copied().collect_vec();
140 peers.shuffle(&mut rand::thread_rng());
141 peers.into_iter()
142 }
143
144 async fn get_session_status_raw_v2(
145 &self,
146 block_index: u64,
147 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
148 decoders: &ModuleDecoderRegistry,
149 ) -> anyhow::Result<SessionStatus> {
150 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
151 let params = ApiRequestErased::new(block_index);
152 let mut last_error = None;
153 for peer_id in self.select_peers_for_status() {
155 match self
156 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
157 SESSION_STATUS_V2_ENDPOINT.to_string(),
158 params.clone(),
159 peer_id,
160 )
161 .await
162 .map_err(anyhow::Error::from)
163 .and_then(|s| Ok(s.try_into_inner(decoders)?))
164 {
165 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
166 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
167 return Ok(SessionStatus::Complete(
169 signed_session_outcome.session_outcome,
170 ));
171 }
172 last_error = Some(format_err!("Invalid signature"));
173 }
174 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
175 return self.get_session_status_raw(block_index, decoders).await;
177 }
178 Err(err) => {
179 last_error = Some(err);
180 }
181 }
182 assert!(last_error.is_some());
184 }
185 Err(last_error.expect("must have at least one peer"))
186 }
187
188 async fn get_session_status_raw(
189 &self,
190 block_index: u64,
191 decoders: &ModuleDecoderRegistry,
192 ) -> anyhow::Result<SessionStatus> {
193 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
194 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
195 SESSION_STATUS_ENDPOINT.to_string(),
196 ApiRequestErased::new(block_index),
197 )
198 .await?
199 .try_into_inner(&decoders.clone().with_fallback())
200 .map_err(|e| anyhow!(e))
201 }
202}
203
204#[apply(async_trait_maybe_send!)]
205impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
206where
207 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
208{
209 fn all_peers(&self) -> &BTreeSet<PeerId> {
210 self.inner.all_peers()
211 }
212
213 fn self_peer(&self) -> Option<PeerId> {
214 self.inner.self_peer()
215 }
216
217 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
218 self.inner.with_module(id)
219 }
220
221 async fn request_raw(
223 &self,
224 peer_id: PeerId,
225 method: &str,
226 params: &ApiRequestErased,
227 ) -> PeerResult<Value> {
228 self.inner.request_raw(peer_id, method, params).await
229 }
230}
231
232#[apply(async_trait_maybe_send!)]
233impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
234where
235 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
236{
237 async fn await_block(
238 &self,
239 session_idx: u64,
240 decoders: &ModuleDecoderRegistry,
241 ) -> anyhow::Result<SessionOutcome> {
242 let mut lru_lock = self.await_session_lru.lock().await;
243
244 let entry_arc = lru_lock
245 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
246 .clone();
247
248 drop(lru_lock);
250
251 entry_arc
252 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
253 .await
254 .cloned()
255 }
256
257 async fn get_session_status(
258 &self,
259 session_idx: u64,
260 decoders: &ModuleDecoderRegistry,
261 core_api_version: ApiVersion,
262 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
263 ) -> anyhow::Result<SessionStatus> {
264 let mut lru_lock = self.get_session_status_lru.lock().await;
265
266 let entry_arc = lru_lock
267 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
268 .clone();
269
270 drop(lru_lock);
272
273 enum NoCacheErr {
274 Initial,
275 Pending(Vec<AcceptedItem>),
276 Err(anyhow::Error),
277 }
278 match entry_arc
279 .get_or_try_init(|| async {
280 let session_status =
281 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
282 self.get_session_status_raw(session_idx, decoders).await
283 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
284 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
285 .await
286 } else {
287 self.get_session_status_raw(session_idx, decoders).await
288 };
289 match session_status {
290 Err(e) => Err(NoCacheErr::Err(e)),
291 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
292 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
293 Ok(SessionStatus::Complete(s)) => Ok(s),
295 }
296 })
297 .await
298 .cloned()
299 {
300 Ok(s) => Ok(SessionStatus::Complete(s)),
301 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
302 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
303 Err(NoCacheErr::Err(e)) => Err(e),
304 }
305 }
306
307 async fn submit_transaction(
308 &self,
309 tx: Transaction,
310 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
311 self.request_current_consensus_retry(
312 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
313 ApiRequestErased::new(SerdeTransaction::from(&tx)),
314 )
315 .await
316 }
317
318 async fn session_count(&self) -> FederationResult<u64> {
319 self.request_current_consensus(
320 SESSION_COUNT_ENDPOINT.to_owned(),
321 ApiRequestErased::default(),
322 )
323 .await
324 }
325
326 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
327 self.request_current_consensus_retry(
328 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
329 ApiRequestErased::new(txid),
330 )
331 .await
332 }
333
334 async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash> {
335 self.request_current_consensus(
336 SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT.to_owned(),
337 ApiRequestErased::default(),
338 )
339 .await
340 }
341
342 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
343 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
344 .await
345 }
346
347 async fn download_backup(
348 &self,
349 id: &secp256k1::PublicKey,
350 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
351 self.request_with_strategy(
352 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
353 RECOVER_ENDPOINT.to_owned(),
354 ApiRequestErased::new(id),
355 )
356 .await
357 }
358
359 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
360 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
361 .await
362 }
363
364 async fn set_config_gen_connections(
365 &self,
366 info: ConfigGenConnectionsRequest,
367 auth: ApiAuth,
368 ) -> FederationResult<()> {
369 self.request_admin(
370 SET_CONFIG_GEN_CONNECTIONS_ENDPOINT,
371 ApiRequestErased::new(info),
372 auth,
373 )
374 .await
375 }
376
377 async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()> {
378 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
379 .await
380 }
381
382 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>> {
383 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
384 .await
385 }
386
387 async fn get_default_config_gen_params(
388 &self,
389 auth: ApiAuth,
390 ) -> FederationResult<ConfigGenParamsRequest> {
391 self.request_admin(
392 DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT,
393 ApiRequestErased::default(),
394 auth,
395 )
396 .await
397 }
398
399 async fn set_config_gen_params(
400 &self,
401 requested: ConfigGenParamsRequest,
402 auth: ApiAuth,
403 ) -> FederationResult<()> {
404 self.request_admin(
405 SET_CONFIG_GEN_PARAMS_ENDPOINT,
406 ApiRequestErased::new(requested),
407 auth,
408 )
409 .await
410 }
411
412 async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse> {
413 self.request_admin_no_auth(
414 CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
415 ApiRequestErased::default(),
416 )
417 .await
418 }
419
420 async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
421 self.request_admin(RUN_DKG_ENDPOINT, ApiRequestErased::default(), auth)
422 .await
423 }
424
425 async fn get_verify_config_hash(
426 &self,
427 auth: ApiAuth,
428 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
429 self.request_admin(
430 VERIFY_CONFIG_HASH_ENDPOINT,
431 ApiRequestErased::default(),
432 auth,
433 )
434 .await
435 }
436
437 async fn verified_configs(
438 &self,
439 auth: ApiAuth,
440 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
441 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
442 .await
443 }
444
445 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
446 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
447 .await
448 }
449
450 async fn status(&self) -> FederationResult<StatusResponse> {
451 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
452 .await
453 }
454
455 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
456 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
457 .await
458 }
459
460 async fn guardian_config_backup(
461 &self,
462 auth: ApiAuth,
463 ) -> FederationResult<GuardianConfigBackup> {
464 self.request_admin(
465 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
466 ApiRequestErased::default(),
467 auth,
468 )
469 .await
470 }
471
472 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
473 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
474 .await
475 }
476
477 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
478 self.request_admin(
479 RESTART_FEDERATION_SETUP_ENDPOINT,
480 ApiRequestErased::default(),
481 auth,
482 )
483 .await
484 }
485
486 async fn submit_api_announcement(
487 &self,
488 announcement_peer_id: PeerId,
489 announcement: SignedApiAnnouncement,
490 ) -> FederationResult<()> {
491 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
492 let announcement_inner = announcement.clone();
493 async move {
494 (
495 peer_id,
496 self.request_single_peer::<()>(
497 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
498 ApiRequestErased::new(SignedApiAnnouncementSubmission {
499 signed_api_announcement: announcement_inner,
500 peer_id: announcement_peer_id,
501 }),
502 peer_id,
503 )
504 .await,
505 )
506 }
507 }))
508 .await
509 .into_iter()
510 .filter_map(|(peer_id, result)| match result {
511 Ok(()) => None,
512 Err(e) => Some((peer_id, e)),
513 })
514 .collect::<BTreeMap<_, _>>();
515
516 if peer_errors.is_empty() {
517 Ok(())
518 } else {
519 Err(FederationError {
520 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
521 params: serde_json::to_value(announcement).expect("can be serialized"),
522 general: None,
523 peer_errors,
524 })
525 }
526 }
527
528 async fn api_announcements(
529 &self,
530 guardian: PeerId,
531 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
532 self.request_single_peer(
533 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
534 ApiRequestErased::default(),
535 guardian,
536 )
537 .await
538 }
539
540 async fn sign_api_announcement(
541 &self,
542 api_url: SafeUrl,
543 auth: ApiAuth,
544 ) -> FederationResult<SignedApiAnnouncement> {
545 self.request_admin(
546 SIGN_API_ANNOUNCEMENT_ENDPOINT,
547 ApiRequestErased::new(api_url),
548 auth,
549 )
550 .await
551 }
552
553 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
554 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
555 .await
556 }
557
558 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
559 self.request_admin(
560 BACKUP_STATISTICS_ENDPOINT,
561 ApiRequestErased::default(),
562 auth,
563 )
564 .await
565 }
566
567 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
568 self.request_single_peer(
569 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
570 ApiRequestErased::default(),
571 peer_id,
572 )
573 .await
574 }
575}