fedimint_api_client/api/
global_api.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10    ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::backup::SignedBackupRequest;
14use fedimint_core::core::ModuleInstanceId;
15use fedimint_core::endpoint_constants::{
16    ADD_CONFIG_GEN_PEER_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
17    AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
18    BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT, CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
19    DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
20    GUARDIAN_CONFIG_BACKUP_ENDPOINT, RECOVER_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
21    RUN_DKG_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT,
22    SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_CONFIG_GEN_CONNECTIONS_ENDPOINT,
23    SET_CONFIG_GEN_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SHUTDOWN_ENDPOINT,
24    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, STATUS_ENDPOINT,
25    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26    VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::module::audit::AuditSummary;
29use fedimint_core::module::registry::ModuleDecoderRegistry;
30use fedimint_core::module::{
31    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
32};
33use fedimint_core::net::api_announcement::{
34    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
35};
36use fedimint_core::session_outcome::{
37    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
38};
39use fedimint_core::task::{MaybeSend, MaybeSync};
40use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
41use fedimint_core::util::SafeUrl;
42use fedimint_core::{apply, async_trait_maybe_send, NumPeersExt, PeerId, TransactionId};
43use fedimint_logging::LOG_CLIENT_NET_API;
44use futures::future::join_all;
45use itertools::Itertools;
46use rand::seq::SliceRandom;
47use serde_json::Value;
48use tokio::sync::OnceCell;
49use tracing::debug;
50
51use super::{
52    DynModuleApi, FederationApiExt, FederationError, FederationResult, GuardianConfigBackup,
53    IGlobalFederationApi, IRawFederationApi, PeerResult, StatusResponse,
54};
55use crate::api::VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2;
56use crate::query::FilterMapThreshold;
57
58/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
59/// a [`GlobalFederationApiWithCache`]
60pub trait GlobalFederationApiWithCacheExt
61where
62    Self: Sized,
63{
64    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72        GlobalFederationApiWithCache::new(self)
73    }
74}
75
76/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
77/// a tiny bit of caching.
78///
79/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
80#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82    inner: T,
83    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
84    ///
85    /// This is mostly to avoid multiple client module recovery processes
86    /// re-requesting same blocks and putting burden on the federation.
87    ///
88    /// The LRU can be be fairly small, as if the modules are
89    /// (near-)bottlenecked on fetching blocks they will naturally
90    /// synchronize, or split into a handful of groups. And if they are not,
91    /// no LRU here is going to help them.
92    await_session_lru: Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
93
94    /// Like [`Self::await_session_lru`], but for
95    /// [`IGlobalFederationApi::get_session_status`].
96    ///
97    /// In theory these two LRUs have the same content, but one is locked by
98    /// potentially long-blocking operation, while the other non-blocking one.
99    /// Given how tiny they are, it's not worth complicating things to unify
100    /// them.
101    get_session_status_lru:
102        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
103}
104
105impl<T> GlobalFederationApiWithCache<T> {
106    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
107        Self {
108            inner,
109            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
110                NonZeroUsize::new(512).expect("is non-zero"),
111            ))),
112            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
113                NonZeroUsize::new(512).expect("is non-zero"),
114            ))),
115        }
116    }
117}
118
119impl<T> GlobalFederationApiWithCache<T>
120where
121    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
122{
123    async fn await_block_raw(
124        &self,
125        block_index: u64,
126        decoders: &ModuleDecoderRegistry,
127    ) -> anyhow::Result<SessionOutcome> {
128        debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
129        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
130            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
131            ApiRequestErased::new(block_index),
132        )
133        .await?
134        .try_into_inner(decoders)
135        .map_err(|e| anyhow!(e.to_string()))
136    }
137
138    fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
139        let mut peers = self.all_peers().iter().copied().collect_vec();
140        peers.shuffle(&mut rand::thread_rng());
141        peers.into_iter()
142    }
143
144    async fn get_session_status_raw_v2(
145        &self,
146        block_index: u64,
147        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
148        decoders: &ModuleDecoderRegistry,
149    ) -> anyhow::Result<SessionStatus> {
150        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
151        let params = ApiRequestErased::new(block_index);
152        let mut last_error = None;
153        // fetch serially
154        for peer_id in self.select_peers_for_status() {
155            match self
156                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
157                    SESSION_STATUS_V2_ENDPOINT.to_string(),
158                    params.clone(),
159                    peer_id,
160                )
161                .await
162                .map_err(anyhow::Error::from)
163                .and_then(|s| Ok(s.try_into_inner(decoders)?))
164            {
165                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
166                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
167                        // early return
168                        return Ok(SessionStatus::Complete(
169                            signed_session_outcome.session_outcome,
170                        ));
171                    }
172                    last_error = Some(format_err!("Invalid signature"));
173                }
174                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
175                    // no signature: use fallback method
176                    return self.get_session_status_raw(block_index, decoders).await;
177                }
178                Err(err) => {
179                    last_error = Some(err);
180                }
181            }
182            // if we loop then we must have last_error
183            assert!(last_error.is_some());
184        }
185        Err(last_error.expect("must have at least one peer"))
186    }
187
188    async fn get_session_status_raw(
189        &self,
190        block_index: u64,
191        decoders: &ModuleDecoderRegistry,
192    ) -> anyhow::Result<SessionStatus> {
193        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
194        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
195            SESSION_STATUS_ENDPOINT.to_string(),
196            ApiRequestErased::new(block_index),
197        )
198        .await?
199        .try_into_inner(&decoders.clone().with_fallback())
200        .map_err(|e| anyhow!(e))
201    }
202}
203
204#[apply(async_trait_maybe_send!)]
205impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
206where
207    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
208{
209    fn all_peers(&self) -> &BTreeSet<PeerId> {
210        self.inner.all_peers()
211    }
212
213    fn self_peer(&self) -> Option<PeerId> {
214        self.inner.self_peer()
215    }
216
217    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
218        self.inner.with_module(id)
219    }
220
221    /// Make request to a specific federation peer by `peer_id`
222    async fn request_raw(
223        &self,
224        peer_id: PeerId,
225        method: &str,
226        params: &ApiRequestErased,
227    ) -> PeerResult<Value> {
228        self.inner.request_raw(peer_id, method, params).await
229    }
230}
231
232#[apply(async_trait_maybe_send!)]
233impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
234where
235    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
236{
237    async fn await_block(
238        &self,
239        session_idx: u64,
240        decoders: &ModuleDecoderRegistry,
241    ) -> anyhow::Result<SessionOutcome> {
242        let mut lru_lock = self.await_session_lru.lock().await;
243
244        let entry_arc = lru_lock
245            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
246            .clone();
247
248        // we drop the lru lock so requests for other `session_idx` can work in parallel
249        drop(lru_lock);
250
251        entry_arc
252            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
253            .await
254            .cloned()
255    }
256
257    async fn get_session_status(
258        &self,
259        session_idx: u64,
260        decoders: &ModuleDecoderRegistry,
261        core_api_version: ApiVersion,
262        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
263    ) -> anyhow::Result<SessionStatus> {
264        let mut lru_lock = self.get_session_status_lru.lock().await;
265
266        let entry_arc = lru_lock
267            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
268            .clone();
269
270        // we drop the lru lock so requests for other `session_idx` can work in parallel
271        drop(lru_lock);
272
273        enum NoCacheErr {
274            Initial,
275            Pending(Vec<AcceptedItem>),
276            Err(anyhow::Error),
277        }
278        match entry_arc
279            .get_or_try_init(|| async {
280                let session_status =
281                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
282                        self.get_session_status_raw(session_idx, decoders).await
283                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
284                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
285                            .await
286                    } else {
287                        self.get_session_status_raw(session_idx, decoders).await
288                    };
289                match session_status {
290                    Err(e) => Err(NoCacheErr::Err(e)),
291                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
292                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
293                    // only status we can cache (hance outer Ok)
294                    Ok(SessionStatus::Complete(s)) => Ok(s),
295                }
296            })
297            .await
298            .cloned()
299        {
300            Ok(s) => Ok(SessionStatus::Complete(s)),
301            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
302            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
303            Err(NoCacheErr::Err(e)) => Err(e),
304        }
305    }
306
307    async fn submit_transaction(
308        &self,
309        tx: Transaction,
310    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
311        self.request_current_consensus_retry(
312            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
313            ApiRequestErased::new(SerdeTransaction::from(&tx)),
314        )
315        .await
316    }
317
318    async fn session_count(&self) -> FederationResult<u64> {
319        self.request_current_consensus(
320            SESSION_COUNT_ENDPOINT.to_owned(),
321            ApiRequestErased::default(),
322        )
323        .await
324    }
325
326    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
327        self.request_current_consensus_retry(
328            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
329            ApiRequestErased::new(txid),
330        )
331        .await
332    }
333
334    async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash> {
335        self.request_current_consensus(
336            SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT.to_owned(),
337            ApiRequestErased::default(),
338        )
339        .await
340    }
341
342    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
343        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
344            .await
345    }
346
347    async fn download_backup(
348        &self,
349        id: &secp256k1::PublicKey,
350    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
351        self.request_with_strategy(
352            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
353            RECOVER_ENDPOINT.to_owned(),
354            ApiRequestErased::new(id),
355        )
356        .await
357    }
358
359    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
360        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
361            .await
362    }
363
364    async fn set_config_gen_connections(
365        &self,
366        info: ConfigGenConnectionsRequest,
367        auth: ApiAuth,
368    ) -> FederationResult<()> {
369        self.request_admin(
370            SET_CONFIG_GEN_CONNECTIONS_ENDPOINT,
371            ApiRequestErased::new(info),
372            auth,
373        )
374        .await
375    }
376
377    async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()> {
378        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
379            .await
380    }
381
382    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>> {
383        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
384            .await
385    }
386
387    async fn get_default_config_gen_params(
388        &self,
389        auth: ApiAuth,
390    ) -> FederationResult<ConfigGenParamsRequest> {
391        self.request_admin(
392            DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT,
393            ApiRequestErased::default(),
394            auth,
395        )
396        .await
397    }
398
399    async fn set_config_gen_params(
400        &self,
401        requested: ConfigGenParamsRequest,
402        auth: ApiAuth,
403    ) -> FederationResult<()> {
404        self.request_admin(
405            SET_CONFIG_GEN_PARAMS_ENDPOINT,
406            ApiRequestErased::new(requested),
407            auth,
408        )
409        .await
410    }
411
412    async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse> {
413        self.request_admin_no_auth(
414            CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
415            ApiRequestErased::default(),
416        )
417        .await
418    }
419
420    async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
421        self.request_admin(RUN_DKG_ENDPOINT, ApiRequestErased::default(), auth)
422            .await
423    }
424
425    async fn get_verify_config_hash(
426        &self,
427        auth: ApiAuth,
428    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
429        self.request_admin(
430            VERIFY_CONFIG_HASH_ENDPOINT,
431            ApiRequestErased::default(),
432            auth,
433        )
434        .await
435    }
436
437    async fn verified_configs(
438        &self,
439        auth: ApiAuth,
440    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
441        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
442            .await
443    }
444
445    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
446        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
447            .await
448    }
449
450    async fn status(&self) -> FederationResult<StatusResponse> {
451        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
452            .await
453    }
454
455    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
456        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
457            .await
458    }
459
460    async fn guardian_config_backup(
461        &self,
462        auth: ApiAuth,
463    ) -> FederationResult<GuardianConfigBackup> {
464        self.request_admin(
465            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
466            ApiRequestErased::default(),
467            auth,
468        )
469        .await
470    }
471
472    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
473        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
474            .await
475    }
476
477    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
478        self.request_admin(
479            RESTART_FEDERATION_SETUP_ENDPOINT,
480            ApiRequestErased::default(),
481            auth,
482        )
483        .await
484    }
485
486    async fn submit_api_announcement(
487        &self,
488        announcement_peer_id: PeerId,
489        announcement: SignedApiAnnouncement,
490    ) -> FederationResult<()> {
491        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
492            let announcement_inner = announcement.clone();
493            async move {
494                (
495                    peer_id,
496                    self.request_single_peer::<()>(
497                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
498                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
499                            signed_api_announcement: announcement_inner,
500                            peer_id: announcement_peer_id,
501                        }),
502                        peer_id,
503                    )
504                    .await,
505                )
506            }
507        }))
508        .await
509        .into_iter()
510        .filter_map(|(peer_id, result)| match result {
511            Ok(()) => None,
512            Err(e) => Some((peer_id, e)),
513        })
514        .collect::<BTreeMap<_, _>>();
515
516        if peer_errors.is_empty() {
517            Ok(())
518        } else {
519            Err(FederationError {
520                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
521                params: serde_json::to_value(announcement).expect("can be serialized"),
522                general: None,
523                peer_errors,
524            })
525        }
526    }
527
528    async fn api_announcements(
529        &self,
530        guardian: PeerId,
531    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
532        self.request_single_peer(
533            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
534            ApiRequestErased::default(),
535            guardian,
536        )
537        .await
538    }
539
540    async fn sign_api_announcement(
541        &self,
542        api_url: SafeUrl,
543        auth: ApiAuth,
544    ) -> FederationResult<SignedApiAnnouncement> {
545        self.request_admin(
546            SIGN_API_ANNOUNCEMENT_ENDPOINT,
547            ApiRequestErased::new(api_url),
548            auth,
549        )
550        .await
551    }
552
553    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
554        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
555            .await
556    }
557
558    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
559        self.request_admin(
560            BACKUP_STATISTICS_ENDPOINT,
561            ApiRequestErased::default(),
562            auth,
563        )
564        .await
565    }
566
567    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
568        self.request_single_peer(
569            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
570            ApiRequestErased::default(),
571            peer_id,
572        )
573        .await
574    }
575}