fedimint_api_client/api/
global_api.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::result;
5use std::sync::Arc;
6
7use anyhow::anyhow;
8use bitcoin::hashes::sha256;
9use bitcoin::secp256k1;
10use fedimint_core::admin_client::{
11    ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
12};
13use fedimint_core::backup::ClientBackupSnapshot;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::core::ModuleInstanceId;
16use fedimint_core::endpoint_constants::{
17    ADD_CONFIG_GEN_PEER_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
18    AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
19    CONFIG_GEN_PEERS_ENDPOINT, CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
20    DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
21    GUARDIAN_CONFIG_BACKUP_ENDPOINT, RECOVER_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
22    RUN_DKG_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT,
23    SESSION_STATUS_ENDPOINT, SET_CONFIG_GEN_CONNECTIONS_ENDPOINT, SET_CONFIG_GEN_PARAMS_ENDPOINT,
24    SET_PASSWORD_ENDPOINT, SHUTDOWN_ENDPOINT, SIGN_API_ANNOUNCEMENT_ENDPOINT,
25    START_CONSENSUS_ENDPOINT, STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
26    SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT, VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::module::audit::AuditSummary;
29use fedimint_core::module::registry::ModuleDecoderRegistry;
30use fedimint_core::module::{ApiAuth, ApiRequestErased, SerdeModuleEncoding};
31use fedimint_core::net::api_announcement::{
32    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
33};
34use fedimint_core::session_outcome::{AcceptedItem, SessionOutcome, SessionStatus};
35use fedimint_core::task::{MaybeSend, MaybeSync};
36use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
37use fedimint_core::util::SafeUrl;
38use fedimint_core::{apply, async_trait_maybe_send, NumPeersExt, PeerId, TransactionId};
39use fedimint_logging::LOG_CLIENT_NET_API;
40use futures::future::join_all;
41use jsonrpsee_core::client::Error as JsonRpcClientError;
42use serde_json::Value;
43use tokio::sync::OnceCell;
44use tracing::debug;
45
46use super::{
47    DynModuleApi, FederationApiExt, FederationError, FederationResult, GuardianConfigBackup,
48    IGlobalFederationApi, IRawFederationApi, PeerResult, StatusResponse,
49};
50use crate::query::FilterMapThreshold;
51
52/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
53/// a [`GlobalFederationApiWithCache`]
54pub trait GlobalFederationApiWithCacheExt
55where
56    Self: Sized,
57{
58    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
59}
60
61impl<T> GlobalFederationApiWithCacheExt for T
62where
63    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
64{
65    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
66        GlobalFederationApiWithCache::new(self)
67    }
68}
69
70/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
71/// a tiny bit of caching.
72///
73/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
74#[derive(Debug)]
75pub struct GlobalFederationApiWithCache<T> {
76    inner: T,
77    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
78    ///
79    /// This is mostly to avoid multiple client module recovery processes
80    /// re-requesting same blocks and putting burden on the federation.
81    ///
82    /// The LRU can be be fairly small, as if the modules are
83    /// (near-)bottlenecked on fetching blocks they will naturally
84    /// synchronize, or split into a handful of groups. And if they are not,
85    /// no LRU here is going to help them.
86    await_session_lru: Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
87
88    /// Like [`Self::await_session_lru`], but for
89    /// [`IGlobalFederationApi::get_session_status`].
90    ///
91    /// In theory these two LRUs have the same content, but one is locked by
92    /// potentially long-blocking operation, while the other non-blocking one.
93    /// Given how tiny they are, it's not worth complicating things to unify
94    /// them.
95    get_session_status_lru:
96        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
97}
98
99impl<T> GlobalFederationApiWithCache<T> {
100    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
101        Self {
102            inner,
103            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
104                NonZeroUsize::new(512).expect("is non-zero"),
105            ))),
106            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
107                NonZeroUsize::new(512).expect("is non-zero"),
108            ))),
109        }
110    }
111}
112
113impl<T> GlobalFederationApiWithCache<T>
114where
115    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
116{
117    async fn await_block_raw(
118        &self,
119        block_index: u64,
120        decoders: &ModuleDecoderRegistry,
121    ) -> anyhow::Result<SessionOutcome> {
122        debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
123        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
124            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
125            ApiRequestErased::new(block_index),
126        )
127        .await?
128        .try_into_inner(decoders)
129        .map_err(|e| anyhow!(e.to_string()))
130    }
131
132    async fn get_session_status_raw(
133        &self,
134        block_index: u64,
135        decoders: &ModuleDecoderRegistry,
136    ) -> anyhow::Result<SessionStatus> {
137        debug!(target: LOG_CLIENT_NET_API, block_index, "Fetching block's outcome from Federation");
138        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
139            SESSION_STATUS_ENDPOINT.to_string(),
140            ApiRequestErased::new(block_index),
141        )
142        .await?
143        .try_into_inner(&decoders.clone().with_fallback())
144        .map_err(|e| anyhow!(e))
145    }
146}
147
148#[apply(async_trait_maybe_send!)]
149impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
150where
151    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
152{
153    fn all_peers(&self) -> &BTreeSet<PeerId> {
154        self.inner.all_peers()
155    }
156
157    fn self_peer(&self) -> Option<PeerId> {
158        self.inner.self_peer()
159    }
160
161    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
162        self.inner.with_module(id)
163    }
164
165    /// Make request to a specific federation peer by `peer_id`
166    async fn request_raw(
167        &self,
168        peer_id: PeerId,
169        method: &str,
170        params: &[Value],
171    ) -> result::Result<Value, JsonRpcClientError> {
172        self.inner.request_raw(peer_id, method, params).await
173    }
174}
175
176#[apply(async_trait_maybe_send!)]
177impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
178where
179    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
180{
181    async fn await_block(
182        &self,
183        session_idx: u64,
184        decoders: &ModuleDecoderRegistry,
185    ) -> anyhow::Result<SessionOutcome> {
186        let mut lru_lock = self.await_session_lru.lock().await;
187
188        let entry_arc = lru_lock
189            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
190            .clone();
191
192        // we drop the lru lock so requests for other `session_idx` can work in parallel
193        drop(lru_lock);
194
195        entry_arc
196            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
197            .await
198            .cloned()
199    }
200
201    async fn get_session_status(
202        &self,
203        session_idx: u64,
204        decoders: &ModuleDecoderRegistry,
205    ) -> anyhow::Result<SessionStatus> {
206        let mut lru_lock = self.get_session_status_lru.lock().await;
207
208        let entry_arc = lru_lock
209            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
210            .clone();
211
212        // we drop the lru lock so requests for other `session_idx` can work in parallel
213        drop(lru_lock);
214
215        enum NoCacheErr {
216            Initial,
217            Pending(Vec<AcceptedItem>),
218            Err(anyhow::Error),
219        }
220        match entry_arc
221            .get_or_try_init(|| async {
222                match self.get_session_status_raw(session_idx, decoders).await {
223                    Err(e) => Err(NoCacheErr::Err(e)),
224                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
225                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
226                    // only status we can cache (hance outer Ok)
227                    Ok(SessionStatus::Complete(s)) => Ok(s),
228                }
229            })
230            .await
231            .cloned()
232        {
233            Ok(s) => Ok(SessionStatus::Complete(s)),
234            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
235            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
236            Err(NoCacheErr::Err(e)) => Err(e),
237        }
238    }
239
240    /// Submit a transaction for inclusion
241    async fn submit_transaction(
242        &self,
243        tx: Transaction,
244    ) -> FederationResult<SerdeModuleEncoding<TransactionSubmissionOutcome>> {
245        self.request_current_consensus(
246            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
247            ApiRequestErased::new(SerdeTransaction::from(&tx)),
248        )
249        .await
250    }
251
252    async fn session_count(&self) -> FederationResult<u64> {
253        self.request_current_consensus(
254            SESSION_COUNT_ENDPOINT.to_owned(),
255            ApiRequestErased::default(),
256        )
257        .await
258    }
259
260    async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId> {
261        self.request_current_consensus(
262            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
263            ApiRequestErased::new(txid),
264        )
265        .await
266    }
267
268    async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash> {
269        self.request_current_consensus(
270            SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT.to_owned(),
271            ApiRequestErased::default(),
272        )
273        .await
274    }
275
276    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
277        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
278            .await
279    }
280
281    async fn download_backup(
282        &self,
283        id: &secp256k1::PublicKey,
284    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
285        self.request_with_strategy(
286            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
287            RECOVER_ENDPOINT.to_owned(),
288            ApiRequestErased::new(id),
289        )
290        .await
291    }
292
293    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
294        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
295            .await
296    }
297
298    async fn set_config_gen_connections(
299        &self,
300        info: ConfigGenConnectionsRequest,
301        auth: ApiAuth,
302    ) -> FederationResult<()> {
303        self.request_admin(
304            SET_CONFIG_GEN_CONNECTIONS_ENDPOINT,
305            ApiRequestErased::new(info),
306            auth,
307        )
308        .await
309    }
310
311    async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()> {
312        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
313            .await
314    }
315
316    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>> {
317        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
318            .await
319    }
320
321    async fn get_default_config_gen_params(
322        &self,
323        auth: ApiAuth,
324    ) -> FederationResult<ConfigGenParamsRequest> {
325        self.request_admin(
326            DEFAULT_CONFIG_GEN_PARAMS_ENDPOINT,
327            ApiRequestErased::default(),
328            auth,
329        )
330        .await
331    }
332
333    async fn set_config_gen_params(
334        &self,
335        requested: ConfigGenParamsRequest,
336        auth: ApiAuth,
337    ) -> FederationResult<()> {
338        self.request_admin(
339            SET_CONFIG_GEN_PARAMS_ENDPOINT,
340            ApiRequestErased::new(requested),
341            auth,
342        )
343        .await
344    }
345
346    async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse> {
347        self.request_admin_no_auth(
348            CONSENSUS_CONFIG_GEN_PARAMS_ENDPOINT,
349            ApiRequestErased::default(),
350        )
351        .await
352    }
353
354    async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
355        self.request_admin(RUN_DKG_ENDPOINT, ApiRequestErased::default(), auth)
356            .await
357    }
358
359    async fn get_verify_config_hash(
360        &self,
361        auth: ApiAuth,
362    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
363        self.request_admin(
364            VERIFY_CONFIG_HASH_ENDPOINT,
365            ApiRequestErased::default(),
366            auth,
367        )
368        .await
369    }
370
371    async fn verified_configs(
372        &self,
373        auth: ApiAuth,
374    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
375        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
376            .await
377    }
378
379    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
380        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
381            .await
382    }
383
384    async fn status(&self) -> FederationResult<StatusResponse> {
385        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
386            .await
387    }
388
389    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
390        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
391            .await
392    }
393
394    async fn guardian_config_backup(
395        &self,
396        auth: ApiAuth,
397    ) -> FederationResult<GuardianConfigBackup> {
398        self.request_admin(
399            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
400            ApiRequestErased::default(),
401            auth,
402        )
403        .await
404    }
405
406    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
407        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
408            .await
409    }
410
411    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
412        self.request_admin(
413            RESTART_FEDERATION_SETUP_ENDPOINT,
414            ApiRequestErased::default(),
415            auth,
416        )
417        .await
418    }
419
420    async fn submit_api_announcement(
421        &self,
422        announcement_peer_id: PeerId,
423        announcement: SignedApiAnnouncement,
424    ) -> FederationResult<()> {
425        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
426            let announcement_inner = announcement.clone();
427            async move {
428                (
429                    peer_id,
430                    self.request_single_peer(
431                        None,
432                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
433                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
434                            signed_api_announcement: announcement_inner,
435                            peer_id: announcement_peer_id,
436                        }),
437                        peer_id,
438                    )
439                    .await,
440                )
441            }
442        }))
443        .await
444        .into_iter()
445        .filter_map(|(peer_id, result)| match result {
446            Ok(_) => None,
447            Err(e) => Some((peer_id, e.into())),
448        })
449        .collect::<BTreeMap<_, _>>();
450
451        if peer_errors.is_empty() {
452            Ok(())
453        } else {
454            Err(FederationError {
455                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
456                params: serde_json::to_value(announcement).expect("can be serialized"),
457                general: None,
458                peers: peer_errors,
459            })
460        }
461    }
462
463    async fn api_announcements(
464        &self,
465        guardian: PeerId,
466    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
467        self.request_single_peer_typed(
468            None,
469            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
470            ApiRequestErased::default(),
471            guardian,
472        )
473        .await
474    }
475
476    async fn sign_api_announcement(
477        &self,
478        api_url: SafeUrl,
479        auth: ApiAuth,
480    ) -> FederationResult<SignedApiAnnouncement> {
481        self.request_admin(
482            SIGN_API_ANNOUNCEMENT_ENDPOINT,
483            ApiRequestErased::new(api_url),
484            auth,
485        )
486        .await
487    }
488
489    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
490        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
491            .await
492    }
493
494    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
495        self.request_single_peer_typed(
496            None,
497            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
498            ApiRequestErased::default(),
499            peer_id,
500        )
501        .await
502    }
503}