1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::result;
5use std::sync::Arc;
6
7use anyhow::anyhow;
8use bitcoin::hashes::sha256;
9use bitcoin::secp256k1;
10use fedimint_core::admin_client::{
11 ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
12};
13use fedimint_core::backup::ClientBackupSnapshot;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::core::ModuleInstanceId;
16use fedimint_core::endpoint_constants::{
17 ADD_CONFIG_GEN_PEER_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
18 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
19 CONFIG_GEN_PEERS_ENDPOINT, CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
20 DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
21 GUARDIAN_CONFIG_BACKUP_ENDPOINT, RECOVER_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
22 RUN_DKG_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT,
23 SESSION_STATUS_ENDPOINT, SET_CONFIG_GEN_CONNECTIONS_ENDPOINT, SET_CONFIG_GEN_PARAMS_ENDPOINT,
24 SET_PASSWORD_ENDPOINT, SHUTDOWN_ENDPOINT, SIGN_API_ANNOUNCEMENT_ENDPOINT,
25 START_CONSENSUS_ENDPOINT, STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
26 SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT, VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::module::audit::AuditSummary;
29use fedimint_core::module::registry::ModuleDecoderRegistry;
30use fedimint_core::module::{ApiAuth, ApiRequestErased, SerdeModuleEncoding};
31use fedimint_core::net::api_announcement::{
32 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
33};
34use fedimint_core::session_outcome::{AcceptedItem, SessionOutcome, SessionStatus};
35use fedimint_core::task::{MaybeSend, MaybeSync};
36use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
37use fedimint_core::util::SafeUrl;
38use fedimint_core::{apply, async_trait_maybe_send, NumPeersExt, PeerId, TransactionId};
39use fedimint_logging::LOG_CLIENT_NET_API;
40use futures::future::join_all;
41use jsonrpsee_core::client::Error as JsonRpcClientError;
42use serde_json::Value;
43use tokio::sync::OnceCell;
44use tracing::debug;
45
46use super::{
47 DynModuleApi, FederationApiExt, FederationError, FederationResult, GuardianConfigBackup,
48 IGlobalFederationApi, IRawFederationApi, PeerResult, StatusResponse,
49};
50use crate::query::FilterMapThreshold;
51
52pub trait GlobalFederationApiWithCacheExt
55where
56 Self: Sized,
57{
58 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
59}
60
61impl<T> GlobalFederationApiWithCacheExt for T
62where
63 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
64{
65 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
66 GlobalFederationApiWithCache::new(self)
67 }
68}
69
70#[derive(Debug)]
75pub struct GlobalFederationApiWithCache<T> {
76 inner: T,
77 await_session_lru: Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
87
88 get_session_status_lru:
96 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
97}
98
99impl<T> GlobalFederationApiWithCache<T> {
100 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
101 Self {
102 inner,
103 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
104 NonZeroUsize::new(512).expect("is non-zero"),
105 ))),
106 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
107 NonZeroUsize::new(512).expect("is non-zero"),
108 ))),
109 }
110 }
111}
112
113impl<T> GlobalFederationApiWithCache<T>
114where
115 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
116{
117 async fn await_block_raw(
118 &self,
119 block_index: u64,
120 decoders: &ModuleDecoderRegistry,
121 ) -> anyhow::Result<SessionOutcome> {
122 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
123 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
124 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
125 ApiRequestErased::new(block_index),
126 )
127 .await?
128 .try_into_inner(decoders)
129 .map_err(|e| anyhow!(e.to_string()))
130 }
131
132 async fn get_session_status_raw(
133 &self,
134 block_index: u64,
135 decoders: &ModuleDecoderRegistry,
136 ) -> anyhow::Result<SessionStatus> {
137 debug!(target: LOG_CLIENT_NET_API, block_index, "Fetching block's outcome from Federation");
138 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
139 SESSION_STATUS_ENDPOINT.to_string(),
140 ApiRequestErased::new(block_index),
141 )
142 .await?
143 .try_into_inner(&decoders.clone().with_fallback())
144 .map_err(|e| anyhow!(e))
145 }
146}
147
148#[apply(async_trait_maybe_send!)]
149impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
150where
151 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
152{
153 fn all_peers(&self) -> &BTreeSet<PeerId> {
154 self.inner.all_peers()
155 }
156
157 fn self_peer(&self) -> Option<PeerId> {
158 self.inner.self_peer()
159 }
160
161 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
162 self.inner.with_module(id)
163 }
164
165 async fn request_raw(
167 &self,
168 peer_id: PeerId,
169 method: &str,
170 params: &[Value],
171 ) -> result::Result<Value, JsonRpcClientError> {
172 self.inner.request_raw(peer_id, method, params).await
173 }
174}
175
176#[apply(async_trait_maybe_send!)]
177impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
178where
179 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
180{
181 async fn await_block(
182 &self,
183 session_idx: u64,
184 decoders: &ModuleDecoderRegistry,
185 ) -> anyhow::Result<SessionOutcome> {
186 let mut lru_lock = self.await_session_lru.lock().await;
187
188 let entry_arc = lru_lock
189 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
190 .clone();
191
192 drop(lru_lock);
194
195 entry_arc
196 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
197 .await
198 .cloned()
199 }
200
201 async fn get_session_status(
202 &self,
203 session_idx: u64,
204 decoders: &ModuleDecoderRegistry,
205 ) -> anyhow::Result<SessionStatus> {
206 let mut lru_lock = self.get_session_status_lru.lock().await;
207
208 let entry_arc = lru_lock
209 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
210 .clone();
211
212 drop(lru_lock);
214
215 enum NoCacheErr {
216 Initial,
217 Pending(Vec<AcceptedItem>),
218 Err(anyhow::Error),
219 }
220 match entry_arc
221 .get_or_try_init(|| async {
222 match self.get_session_status_raw(session_idx, decoders).await {
223 Err(e) => Err(NoCacheErr::Err(e)),
224 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
225 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
226 Ok(SessionStatus::Complete(s)) => Ok(s),
228 }
229 })
230 .await
231 .cloned()
232 {
233 Ok(s) => Ok(SessionStatus::Complete(s)),
234 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
235 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
236 Err(NoCacheErr::Err(e)) => Err(e),
237 }
238 }
239
240 async fn submit_transaction(
242 &self,
243 tx: Transaction,
244 ) -> FederationResult<SerdeModuleEncoding<TransactionSubmissionOutcome>> {
245 self.request_current_consensus(
246 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
247 ApiRequestErased::new(SerdeTransaction::from(&tx)),
248 )
249 .await
250 }
251
252 async fn session_count(&self) -> FederationResult<u64> {
253 self.request_current_consensus(
254 SESSION_COUNT_ENDPOINT.to_owned(),
255 ApiRequestErased::default(),
256 )
257 .await
258 }
259
260 async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId> {
261 self.request_current_consensus(
262 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
263 ApiRequestErased::new(txid),
264 )
265 .await
266 }
267
268 async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash> {
269 self.request_current_consensus(
270 SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT.to_owned(),
271 ApiRequestErased::default(),
272 )
273 .await
274 }
275
276 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
277 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
278 .await
279 }
280
281 async fn download_backup(
282 &self,
283 id: &secp256k1::PublicKey,
284 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
285 self.request_with_strategy(
286 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
287 RECOVER_ENDPOINT.to_owned(),
288 ApiRequestErased::new(id),
289 )
290 .await
291 }
292
293 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
294 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
295 .await
296 }
297
298 async fn set_config_gen_connections(
299 &self,
300 info: ConfigGenConnectionsRequest,
301 auth: ApiAuth,
302 ) -> FederationResult<()> {
303 self.request_admin(
304 SET_CONFIG_GEN_CONNECTIONS_ENDPOINT,
305 ApiRequestErased::new(info),
306 auth,
307 )
308 .await
309 }
310
311 async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()> {
312 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
313 .await
314 }
315
316 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>> {
317 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
318 .await
319 }
320
321 async fn get_default_config_gen_params(
322 &self,
323 auth: ApiAuth,
324 ) -> FederationResult<ConfigGenParamsRequest> {
325 self.request_admin(
326 DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT,
327 ApiRequestErased::default(),
328 auth,
329 )
330 .await
331 }
332
333 async fn set_config_gen_params(
334 &self,
335 requested: ConfigGenParamsRequest,
336 auth: ApiAuth,
337 ) -> FederationResult<()> {
338 self.request_admin(
339 SET_CONFIG_GEN_PARAMS_ENDPOINT,
340 ApiRequestErased::new(requested),
341 auth,
342 )
343 .await
344 }
345
346 async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse> {
347 self.request_admin_no_auth(
348 CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
349 ApiRequestErased::default(),
350 )
351 .await
352 }
353
354 async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
355 self.request_admin(RUN_DKG_ENDPOINT, ApiRequestErased::default(), auth)
356 .await
357 }
358
359 async fn get_verify_config_hash(
360 &self,
361 auth: ApiAuth,
362 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
363 self.request_admin(
364 VERIFY_CONFIG_HASH_ENDPOINT,
365 ApiRequestErased::default(),
366 auth,
367 )
368 .await
369 }
370
371 async fn verified_configs(
372 &self,
373 auth: ApiAuth,
374 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
375 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
376 .await
377 }
378
379 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
380 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
381 .await
382 }
383
384 async fn status(&self) -> FederationResult<StatusResponse> {
385 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
386 .await
387 }
388
389 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
390 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
391 .await
392 }
393
394 async fn guardian_config_backup(
395 &self,
396 auth: ApiAuth,
397 ) -> FederationResult<GuardianConfigBackup> {
398 self.request_admin(
399 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
400 ApiRequestErased::default(),
401 auth,
402 )
403 .await
404 }
405
406 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
407 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
408 .await
409 }
410
411 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
412 self.request_admin(
413 RESTART_FEDERATION_SETUP_ENDPOINT,
414 ApiRequestErased::default(),
415 auth,
416 )
417 .await
418 }
419
420 async fn submit_api_announcement(
421 &self,
422 announcement_peer_id: PeerId,
423 announcement: SignedApiAnnouncement,
424 ) -> FederationResult<()> {
425 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
426 let announcement_inner = announcement.clone();
427 async move {
428 (
429 peer_id,
430 self.request_single_peer(
431 None,
432 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
433 ApiRequestErased::new(SignedApiAnnouncementSubmission {
434 signed_api_announcement: announcement_inner,
435 peer_id: announcement_peer_id,
436 }),
437 peer_id,
438 )
439 .await,
440 )
441 }
442 }))
443 .await
444 .into_iter()
445 .filter_map(|(peer_id, result)| match result {
446 Ok(_) => None,
447 Err(e) => Some((peer_id, e.into())),
448 })
449 .collect::<BTreeMap<_, _>>();
450
451 if peer_errors.is_empty() {
452 Ok(())
453 } else {
454 Err(FederationError {
455 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
456 params: serde_json::to_value(announcement).expect("can be serialized"),
457 general: None,
458 peers: peer_errors,
459 })
460 }
461 }
462
463 async fn api_announcements(
464 &self,
465 guardian: PeerId,
466 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
467 self.request_single_peer_typed(
468 None,
469 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
470 ApiRequestErased::default(),
471 guardian,
472 )
473 .await
474 }
475
476 async fn sign_api_announcement(
477 &self,
478 api_url: SafeUrl,
479 auth: ApiAuth,
480 ) -> FederationResult<SignedApiAnnouncement> {
481 self.request_admin(
482 SIGN_API_ANNOUNCEMENT_ENDPOINT,
483 ApiRequestErased::new(api_url),
484 auth,
485 )
486 .await
487 }
488
489 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
490 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
491 .await
492 }
493
494 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
495 self.request_single_peer_typed(
496 None,
497 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
498 ApiRequestErased::default(),
499 peer_id,
500 )
501 .await
502 }
503}