fedimint_api_client/api/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::fmt::Debug;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6use std::{cmp, result};
7
8use anyhow::anyhow;
9#[cfg(all(feature = "tor", not(target_family = "wasm")))]
10use arti_client::{TorAddr, TorClient, TorClientConfig};
11use base64::Engine as _;
12use bitcoin::hashes::sha256;
13use bitcoin::secp256k1;
14pub use error::{FederationError, OutputOutcomeError, PeerError};
15use fedimint_core::admin_client::{
16    ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
17    ServerStatus,
18};
19use fedimint_core::backup::ClientBackupSnapshot;
20use fedimint_core::core::backup::SignedBackupRequest;
21use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::endpoint_constants::AWAIT_OUTPUT_OUTCOME_ENDPOINT;
24use fedimint_core::fmt_utils::AbbreviateDebug;
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding};
29use fedimint_core::net::api_announcement::SignedApiAnnouncement;
30use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
31use fedimint_core::task::{MaybeSend, MaybeSync};
32use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
33use fedimint_core::util::SafeUrl;
34use fedimint_core::{
35    apply, async_trait_maybe_send, dyn_newtype_define, runtime, NumPeersExt, OutPoint, PeerId,
36    TransactionId,
37};
38use fedimint_logging::LOG_CLIENT_NET_API;
39use futures::stream::FuturesUnordered;
40use futures::{Future, StreamExt};
41use itertools::Itertools;
42use jsonrpsee_core::client::ClientT;
43pub use jsonrpsee_core::client::Error as JsonRpcClientError;
44use jsonrpsee_core::DeserializeOwned;
45#[cfg(target_family = "wasm")]
46use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
47#[cfg(not(target_family = "wasm"))]
48use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
49#[cfg(not(target_family = "wasm"))]
50use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
51use net::Connector;
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54#[cfg(not(target_family = "wasm"))]
55use tokio_rustls::rustls::RootCertStore;
56#[cfg(all(feature = "tor", not(target_family = "wasm")))]
57use tokio_rustls::{rustls::ClientConfig as TlsClientConfig, TlsConnector};
58use tracing::{debug, instrument, trace};
59
60use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
61mod error;
62mod global_api;
63pub mod net;
64mod peer;
65
66pub use global_api::{GlobalFederationApiWithCache, GlobalFederationApiWithCacheExt};
67use peer::FederationPeer;
68
69pub type PeerResult<T> = Result<T, PeerError>;
70pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
71pub type FederationResult<T> = Result<T, FederationError>;
72pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
73
74pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
75
76/// Set of api versions for each component (core + modules)
77///
78/// E.g. result of federated common api versions discovery.
79#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
80pub struct ApiVersionSet {
81    pub core: ApiVersion,
82    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
83}
84
85/// An API (module or global) that can query a federation
86#[apply(async_trait_maybe_send!)]
87pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
88    /// List of all federation peers for the purpose of iterating each peer
89    /// in the federation.
90    ///
91    /// The underlying implementation is responsible for knowing how many
92    /// and `PeerId`s of each. The caller of this interface most probably
93    /// have some idea as well, but passing this set across every
94    /// API call to the federation would be inconvenient.
95    fn all_peers(&self) -> &BTreeSet<PeerId>;
96
97    /// PeerId of the Guardian node, if set
98    ///
99    /// This is for using Client in a "Admin" mode, making authenticated
100    /// calls to own `fedimintd` instance.
101    fn self_peer(&self) -> Option<PeerId>;
102
103    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
104
105    /// Make request to a specific federation peer by `peer_id`
106    async fn request_raw(
107        &self,
108        peer_id: PeerId,
109        method: &str,
110        params: &[Value],
111    ) -> result::Result<Value, JsonRpcClientError>;
112}
113
114/// An extension trait allowing to making federation-wide API call on top
115/// [`IRawFederationApi`].
116#[apply(async_trait_maybe_send!)]
117pub trait FederationApiExt: IRawFederationApi {
118    /// Make a request to a single peer in the federation with an optional
119    /// timeout.
120    async fn request_single_peer(
121        &self,
122        timeout: Option<Duration>,
123        method: String,
124        params: ApiRequestErased,
125        peer_id: PeerId,
126    ) -> JsonRpcResult<jsonrpsee_core::JsonValue> {
127        let request = async {
128            self.request_raw(peer_id, &method, &[params.to_json()])
129                .await
130        };
131
132        if let Some(timeout) = timeout {
133            match fedimint_core::runtime::timeout(timeout, request).await {
134                Ok(result) => result,
135                Err(_timeout) => Err(JsonRpcClientError::RequestTimeout),
136            }
137        } else {
138            request.await
139        }
140    }
141    async fn request_single_peer_typed<Ret>(
142        &self,
143        timeout: Option<Duration>,
144        method: String,
145        params: ApiRequestErased,
146        peer_id: PeerId,
147    ) -> PeerResult<Ret>
148    where
149        Ret: DeserializeOwned,
150    {
151        self.request_single_peer(timeout, method, params, peer_id)
152            .await
153            .map_err(PeerError::Rpc)
154            .and_then(|v| {
155                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
156            })
157    }
158
159    /// Like [`Self::request_single_peer`], but API more like
160    /// [`Self::request_with_strategy`].
161    async fn request_single_peer_federation<FedRet>(
162        &self,
163        timeout: Option<Duration>,
164        method: String,
165        params: ApiRequestErased,
166        peer_id: PeerId,
167    ) -> FederationResult<FedRet>
168    where
169        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
170    {
171        Ok(self
172            .request_single_peer(timeout, method.clone(), params.clone(), peer_id)
173            .await
174            .map_err(PeerError::Rpc)
175            .and_then(|v| {
176                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
177            })
178            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))?)
179    }
180
181    /// Make an aggregate request to federation, using `strategy` to logically
182    /// merge the responses.
183    async fn request_with_strategy<PeerRet: serde::de::DeserializeOwned, FedRet: Debug>(
184        &self,
185        mut strategy: impl QueryStrategy<PeerRet, FedRet> + MaybeSend,
186        method: String,
187        params: ApiRequestErased,
188    ) -> FederationResult<FedRet> {
189        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
190        // completed results from it and we don't do any `await`s when
191        // processing them, it should be totally OK.
192        #[cfg(not(target_family = "wasm"))]
193        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
194        #[cfg(target_family = "wasm")]
195        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
196
197        let peers = self.all_peers();
198
199        for peer_id in peers {
200            futures.push(Box::pin(async {
201                let request = async {
202                    self.request_raw(*peer_id, &method, &[params.to_json()])
203                        .await
204                        .map(AbbreviateDebug)
205                };
206
207                PeerResponse {
208                    peer: *peer_id,
209                    result: request.await,
210                }
211            }));
212        }
213
214        let mut peer_delay_ms = BTreeMap::new();
215
216        // Delegates the response handling to the `QueryStrategy` with an exponential
217        // back-off with every new set of requests
218        let max_delay_ms = 1000;
219        loop {
220            let response = futures.next().await;
221            trace!(target: LOG_CLIENT_NET_API, ?response, method, params = ?AbbreviateDebug(params.to_json()), "Received peer response");
222            match response {
223                Some(PeerResponse { peer, result }) => {
224                    let result: PeerResult<PeerRet> =
225                        result.map_err(PeerError::Rpc).and_then(|o| {
226                            serde_json::from_value::<PeerRet>(o.0)
227                                .map_err(|e| PeerError::ResponseDeserialization(e.into()))
228                        });
229
230                    let strategy_step = strategy.process(peer, result);
231                    trace!(
232                        target: LOG_CLIENT_NET_API,
233                        method,
234                        ?params,
235                        ?strategy_step,
236                        "Taking strategy step to the response after peer response"
237                    );
238                    match strategy_step {
239                        QueryStep::Retry(peers) => {
240                            for retry_peer in peers {
241                                let mut delay_ms =
242                                    peer_delay_ms.get(&retry_peer).copied().unwrap_or(10);
243                                delay_ms = cmp::min(max_delay_ms, delay_ms * 2);
244                                peer_delay_ms.insert(retry_peer, delay_ms);
245
246                                futures.push(Box::pin({
247                                    let method = &method;
248                                    let params = &params;
249                                    async move {
250                                        // Note: we need to sleep inside the retrying future,
251                                        // so that `futures` is being polled continuously
252                                        runtime::sleep(Duration::from_millis(delay_ms)).await;
253                                        PeerResponse {
254                                            peer: retry_peer,
255                                            result: self
256                                                .request_raw(
257                                                    retry_peer,
258                                                    method,
259                                                    &[params.to_json()],
260                                                )
261                                                .await
262                                                .map(AbbreviateDebug),
263                                        }
264                                    }
265                                }));
266                            }
267                        }
268                        QueryStep::Continue => {}
269                        QueryStep::Failure { general, peers } => {
270                            return Err(FederationError {
271                                method: method.clone(),
272                                params: params.params.clone(),
273                                general,
274                                peers,
275                            })
276                        }
277                        QueryStep::Success(response) => return Ok(response),
278                    }
279                }
280                None => {
281                    panic!("Query strategy ran out of peers to query without returning a result");
282                }
283            }
284        }
285    }
286
287    async fn request_current_consensus<Ret>(
288        &self,
289        method: String,
290        params: ApiRequestErased,
291    ) -> FederationResult<Ret>
292    where
293        Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
294    {
295        self.request_with_strategy(
296            ThresholdConsensus::new(self.all_peers().to_num_peers()),
297            method,
298            params,
299        )
300        .await
301    }
302
303    async fn request_admin<Ret>(
304        &self,
305        method: &str,
306        params: ApiRequestErased,
307        auth: ApiAuth,
308    ) -> FederationResult<Ret>
309    where
310        Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
311    {
312        let Some(self_peer_id) = self.self_peer() else {
313            return Err(FederationError::general(
314                method,
315                params,
316                anyhow::format_err!("Admin peer_id not set"),
317            ));
318        };
319        self.request_single_peer_federation(
320            None,
321            method.into(),
322            params.with_auth(auth),
323            self_peer_id,
324        )
325        .await
326    }
327
328    async fn request_admin_no_auth<Ret>(
329        &self,
330        method: &str,
331        params: ApiRequestErased,
332    ) -> FederationResult<Ret>
333    where
334        Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
335    {
336        let Some(self_peer_id) = self.self_peer() else {
337            return Err(FederationError::general(
338                method,
339                params,
340                anyhow::format_err!("Admin peer_id not set"),
341            ));
342        };
343
344        self.request_single_peer_federation(None, method.into(), params, self_peer_id)
345            .await
346    }
347}
348
349#[apply(async_trait_maybe_send!)]
350impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
351
352/// Trait marker for the module (non-global) endpoints
353pub trait IModuleFederationApi: IRawFederationApi {}
354
355dyn_newtype_define! {
356    #[derive(Clone)]
357    pub DynModuleApi(Arc<IModuleFederationApi>)
358}
359
360dyn_newtype_define! {
361    #[derive(Clone)]
362    pub DynGlobalApi(Arc<IGlobalFederationApi>)
363}
364
365impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
366    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
367        self.inner.as_ref()
368    }
369}
370
371impl DynGlobalApi {
372    pub fn new_admin(
373        peer: PeerId,
374        url: SafeUrl,
375        api_secret: &Option<String>,
376        connector: &Connector,
377    ) -> DynGlobalApi {
378        GlobalFederationApiWithCache::new(
379            WsFederationApi::new(connector, vec![(peer, url)], api_secret).with_self_peer_id(peer),
380        )
381        .into()
382    }
383
384    // FIXME: (@leonardo) Should we have the option to do DKG and config related
385    // actions through Tor ? Should we add the `Connector` choice to
386    // ConfigParams then ?
387    pub fn from_pre_peer_id_admin_endpoint(url: SafeUrl, api_secret: &Option<String>) -> Self {
388        // PeerIds are used only for informational purposes, but just in case, make a
389        // big number so it stands out
390        let peer_id = PeerId::from(1024);
391        GlobalFederationApiWithCache::new(
392            WsFederationApi::new(&Connector::default(), vec![(peer_id, url)], api_secret)
393                .with_self_peer_id(peer_id),
394        )
395        .into()
396    }
397
398    pub fn from_single_endpoint(
399        peer: PeerId,
400        url: SafeUrl,
401        api_secret: &Option<String>,
402        connector: &Connector,
403    ) -> Self {
404        GlobalFederationApiWithCache::new(WsFederationApi::new(
405            connector,
406            vec![(peer, url)],
407            api_secret,
408        ))
409        .into()
410    }
411
412    pub fn from_endpoints(
413        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
414        api_secret: &Option<String>,
415        connector: &Connector,
416    ) -> Self {
417        GlobalFederationApiWithCache::new(WsFederationApi::new(connector, peers, api_secret)).into()
418    }
419
420    pub fn from_invite_code(connector: &Connector, invite_code: &InviteCode) -> Self {
421        GlobalFederationApiWithCache::new(WsFederationApi::new(
422            connector,
423            invite_code.peers().into_iter().collect_vec(),
424            &invite_code.api_secret(),
425        ))
426        .into()
427    }
428
429    pub async fn await_output_outcome<R>(
430        &self,
431        outpoint: OutPoint,
432        timeout: Duration,
433        module_decoder: &Decoder,
434    ) -> OutputOutcomeResult<R>
435    where
436        R: OutputOutcome,
437    {
438        fedimint_core::runtime::timeout(timeout, async {
439            let outcome: SerdeOutputOutcome = self
440                .inner
441                .request_current_consensus(
442                    AWAIT_OUTPUT_OUTCOME_ENDPOINT.to_owned(),
443                    ApiRequestErased::new(outpoint),
444                )
445                .await
446                .map_err(OutputOutcomeError::Federation)?;
447
448            deserialize_outcome(&outcome, module_decoder)
449        })
450        .await
451        .map_err(|_| OutputOutcomeError::Timeout(timeout))?
452    }
453}
454
455/// The API for the global (non-module) endpoints
456#[apply(async_trait_maybe_send!)]
457pub trait IGlobalFederationApi: IRawFederationApi {
458    async fn submit_transaction(
459        &self,
460        tx: Transaction,
461    ) -> FederationResult<SerdeModuleEncoding<TransactionSubmissionOutcome>>;
462
463    async fn await_block(
464        &self,
465        block_index: u64,
466        decoders: &ModuleDecoderRegistry,
467    ) -> anyhow::Result<SessionOutcome>;
468
469    async fn get_session_status(
470        &self,
471        block_index: u64,
472        decoders: &ModuleDecoderRegistry,
473    ) -> anyhow::Result<SessionStatus>;
474
475    async fn session_count(&self) -> FederationResult<u64>;
476
477    async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId>;
478
479    /// Fetches the server consensus hash if enough peers agree on it
480    async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash>;
481
482    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
483
484    async fn download_backup(
485        &self,
486        id: &secp256k1::PublicKey,
487    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
488
489    /// Sets the password used to decrypt the configs and authenticate
490    ///
491    /// Must be called first before any other calls to the API
492    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
493
494    /// During config gen, sets the server connection containing our endpoints
495    ///
496    /// Optionally sends our server info to the config gen leader using
497    /// `add_config_gen_peer`
498    async fn set_config_gen_connections(
499        &self,
500        info: ConfigGenConnectionsRequest,
501        auth: ApiAuth,
502    ) -> FederationResult<()>;
503
504    /// During config gen, used for an API-to-API call that adds a peer's server
505    /// connection info to the leader.
506    ///
507    /// Note this call will fail until the leader has their API running and has
508    /// `set_server_connections` so clients should retry.
509    ///
510    /// This call is not authenticated because it's guardian-to-guardian
511    async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
512
513    /// During config gen, gets all the server connections we've received from
514    /// peers using `add_config_gen_peer`
515    ///
516    /// Could be called on the leader, so it's not authenticated
517    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
518
519    /// Gets the default config gen params which can be configured by the
520    /// leader, gives them a template to modify
521    async fn get_default_config_gen_params(
522        &self,
523        auth: ApiAuth,
524    ) -> FederationResult<ConfigGenParamsRequest>;
525
526    /// Leader sets the consensus params, everyone sets the local params
527    ///
528    /// After calling this `ConfigGenParams` can be created for DKG
529    async fn set_config_gen_params(
530        &self,
531        requested: ConfigGenParamsRequest,
532        auth: ApiAuth,
533    ) -> FederationResult<()>;
534
535    /// Returns the consensus config gen params, followers will delegate this
536    /// call to the leader.  Once this endpoint returns successfully we can run
537    /// DKG.
538    async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse>;
539
540    /// Runs DKG, can only be called once after configs have been generated in
541    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
542    /// error and config gen must be restarted.
543    async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
544
545    /// After DKG, returns the hash of the consensus config tweaked with our id.
546    /// We need to share this with all other peers to complete verification.
547    async fn get_verify_config_hash(
548        &self,
549        auth: ApiAuth,
550    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
551
552    /// Updates local state and notify leader that we have verified configs.
553    /// This allows for a synchronization point, before we start consensus.
554    async fn verified_configs(
555        &self,
556        auth: ApiAuth,
557    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
558
559    /// Reads the configs from the disk, starts the consensus server, and shuts
560    /// down the config gen API to start the Fedimint API
561    ///
562    /// Clients may receive an error due to forced shutdown, should call the
563    /// `server_status` to see if consensus has started.
564    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
565
566    /// Returns the status of the server
567    async fn status(&self) -> FederationResult<StatusResponse>;
568
569    /// Show an audit across all modules
570    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
571
572    /// Download the guardian config to back it up
573    async fn guardian_config_backup(&self, auth: ApiAuth)
574        -> FederationResult<GuardianConfigBackup>;
575
576    /// Check auth credentials
577    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
578
579    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
580
581    /// Publish our signed API announcement to other guardians
582    async fn submit_api_announcement(
583        &self,
584        peer_id: PeerId,
585        announcement: SignedApiAnnouncement,
586    ) -> FederationResult<()>;
587
588    async fn api_announcements(
589        &self,
590        guardian: PeerId,
591    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
592
593    async fn sign_api_announcement(
594        &self,
595        api_url: SafeUrl,
596        auth: ApiAuth,
597    ) -> FederationResult<SignedApiAnnouncement>;
598
599    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
600
601    /// Returns the fedimintd version a peer is running
602    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
603}
604
605pub fn deserialize_outcome<R>(
606    outcome: &SerdeOutputOutcome,
607    module_decoder: &Decoder,
608) -> OutputOutcomeResult<R>
609where
610    R: OutputOutcome + MaybeSend,
611{
612    let dyn_outcome = outcome
613        .try_into_inner_known_module_kind(module_decoder)
614        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
615
616    let source_instance = dyn_outcome.module_instance_id();
617
618    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
619        let target_type = std::any::type_name::<R>();
620        OutputOutcomeError::ResponseDeserialization(anyhow!(
621            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
622        ))
623    })
624}
625
626/// Mint API client that will try to run queries against all `peers` expecting
627/// equal results from at least `min_eq_results` of them. Peers that return
628/// differing results are returned as a peer faults list.
629#[derive(Debug, Clone)]
630pub struct WsFederationApi<C = WsClient> {
631    peer_ids: BTreeSet<PeerId>,
632    self_peer_id: Option<PeerId>,
633    peers: Arc<Vec<FederationPeer<C>>>,
634    module_id: Option<ModuleInstanceId>,
635}
636
637impl<C: JsonRpcClient + Debug + 'static> IModuleFederationApi for WsFederationApi<C> {}
638
639/// Implementation of API calls over WebSockets
640///
641/// Can function as either the global or module API
642#[apply(async_trait_maybe_send!)]
643impl<C: JsonRpcClient + Debug + 'static> IRawFederationApi for WsFederationApi<C> {
644    fn all_peers(&self) -> &BTreeSet<PeerId> {
645        &self.peer_ids
646    }
647
648    fn self_peer(&self) -> Option<PeerId> {
649        self.self_peer_id
650    }
651
652    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
653        WsFederationApi {
654            peer_ids: self.peer_ids.clone(),
655            peers: self.peers.clone(),
656            module_id: Some(id),
657            self_peer_id: self.self_peer_id,
658        }
659        .into()
660    }
661
662    async fn request_raw(
663        &self,
664        peer_id: PeerId,
665        method: &str,
666        params: &[Value],
667    ) -> JsonRpcResult<Value> {
668        let peer = self
669            .peers
670            .iter()
671            .find(|m| m.peer_id == peer_id)
672            .ok_or_else(|| JsonRpcClientError::Custom(format!("Invalid peer_id: {peer_id}")))?;
673
674        let method = match self.module_id {
675            None => method.to_string(),
676            Some(id) => format!("module_{id}_{method}"),
677        };
678        peer.request(&method, params).await
679    }
680}
681
682#[apply(async_trait_maybe_send!)]
683pub trait JsonRpcClient: ClientT + Sized + MaybeSend + MaybeSync {
684    async fn connect(
685        url: &SafeUrl,
686        api_secret: Option<String>,
687    ) -> result::Result<Self, JsonRpcClientError>;
688
689    #[cfg(all(feature = "tor", not(target_family = "wasm")))]
690    async fn connect_with_tor(
691        url: &SafeUrl,
692        api_secret: Option<String>,
693    ) -> result::Result<Self, JsonRpcClientError>;
694
695    fn is_connected(&self) -> bool;
696}
697
698#[apply(async_trait_maybe_send!)]
699impl JsonRpcClient for WsClient {
700    async fn connect(
701        url: &SafeUrl,
702        api_secret: Option<String>,
703    ) -> result::Result<Self, JsonRpcClientError> {
704        #[cfg(not(target_family = "wasm"))]
705        let mut client = {
706            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
707            let mut root_certs = RootCertStore::empty();
708            root_certs.extend(webpki_roots);
709
710            let tls_cfg = CustomCertStore::builder()
711                .with_root_certificates(root_certs)
712                .with_no_client_auth();
713
714            WsClientBuilder::default()
715                .max_concurrent_requests(u16::MAX as usize)
716                .with_custom_cert_store(tls_cfg)
717        };
718
719        #[cfg(target_family = "wasm")]
720        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
721
722        if let Some(api_secret) = api_secret {
723            #[cfg(not(target_family = "wasm"))]
724            {
725                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
726                // but we can set up the headers manually
727                let mut headers = HeaderMap::new();
728
729                let auth = base64::engine::general_purpose::STANDARD
730                    .encode(format!("fedimint:{api_secret}"));
731
732                headers.insert(
733                    "Authorization",
734                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
735                );
736
737                client = client.set_headers(headers);
738            }
739            #[cfg(target_family = "wasm")]
740            {
741                // on wasm, url will be handled by the browser, which should take care of
742                // `user:pass@...`
743                let mut url = url.clone();
744                url.set_username("fedimint").map_err(|_| {
745                    JsonRpcClientError::Transport(anyhow::format_err!("invalid username").into())
746                })?;
747                url.set_password(Some(&api_secret)).map_err(|_| {
748                    JsonRpcClientError::Transport(anyhow::format_err!("invalid secret").into())
749                })?;
750                return client.build(url.as_str()).await;
751            }
752        }
753        client.build(url.as_str()).await
754    }
755
756    #[cfg(all(feature = "tor", not(target_family = "wasm")))]
757    async fn connect_with_tor(
758        url: &SafeUrl,
759        api_secret: Option<String>,
760    ) -> result::Result<Self, JsonRpcClientError> {
761        let tor_config = TorClientConfig::default();
762        let tor_client = TorClient::create_bootstrapped(tor_config)
763            .await
764            .map_err(|e| JsonRpcClientError::Transport(e.into()))?
765            .isolated_client();
766
767        debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
768
769        // TODO: (@leonardo) should we implement our `IntoTorAddr` for `SafeUrl`
770        // instead?
771        let addr = (
772            url.host_str()
773                .expect("It should've asserted for `host` on construction"),
774            url.port_or_known_default()
775                .expect("It should've asserted for `port`, or used a default one, on construction"),
776        );
777        let tor_addr = TorAddr::from(addr).map_err(|e| JsonRpcClientError::Transport(e.into()))?;
778        let tor_addr_clone = tor_addr.clone();
779
780        debug!(
781            ?tor_addr,
782            ?addr,
783            "Successfully created `TorAddr` for given address (i.e. host and port)"
784        );
785
786        // TODO: It can be updated to use `is_onion_address()` implementation,
787        // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
788        let anonymized_stream = if url.is_onion_address() {
789            let mut stream_prefs = arti_client::StreamPrefs::default();
790            stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
791
792            let anonymized_stream = tor_client
793                .connect_with_prefs(tor_addr, &stream_prefs)
794                .await
795                .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
796
797            debug!(
798                ?tor_addr_clone,
799                "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
800            );
801            anonymized_stream
802        } else {
803            let anonymized_stream = tor_client
804                .connect(tor_addr)
805                .await
806                .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
807
808            debug!(?tor_addr_clone, "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`");
809            anonymized_stream
810        };
811
812        let is_tls = match url.scheme() {
813            "wss" => true,
814            "ws" => false,
815            unexpected_scheme => {
816                let error =
817                    format!("`{unexpected_scheme}` not supported, it's expected `ws` or `wss`!");
818                return Err(JsonRpcClientError::Transport(anyhow!(error).into()));
819            }
820        };
821
822        let tls_connector = if is_tls {
823            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
824            let mut root_certs = RootCertStore::empty();
825            root_certs.extend(webpki_roots);
826
827            let tls_config = TlsClientConfig::builder()
828                .with_root_certificates(root_certs)
829                .with_no_client_auth();
830            let tls_connector = TlsConnector::from(Arc::new(tls_config));
831            Some(tls_connector)
832        } else {
833            None
834        };
835
836        let mut ws_client_builder =
837            WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
838
839        if let Some(api_secret) = api_secret {
840            // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
841            // but we can set up the headers manually
842            let mut headers = HeaderMap::new();
843
844            let auth =
845                base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
846
847            headers.insert(
848                "Authorization",
849                HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
850            );
851
852            ws_client_builder = ws_client_builder.set_headers(headers);
853        }
854
855        match tls_connector {
856            None => {
857                return ws_client_builder
858                    .build_with_stream(url.as_str(), anonymized_stream)
859                    .await;
860            }
861            Some(tls_connector) => {
862                let host = url.host_str().map(ToOwned::to_owned).ok_or_else(|| {
863                    JsonRpcClientError::Transport(anyhow!("Invalid host!").into())
864                })?;
865
866                // FIXME: (@leonardo) Is this leaking any data ? Should investigate it further
867                // if it's really needed.
868                let server_name = rustls_pki_types::ServerName::try_from(host)
869                    .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
870
871                let anonymized_tls_stream = tls_connector
872                    .connect(server_name, anonymized_stream)
873                    .await
874                    .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
875
876                return ws_client_builder
877                    .build_with_stream(url.as_str(), anonymized_tls_stream)
878                    .await;
879            }
880        }
881    }
882
883    fn is_connected(&self) -> bool {
884        self.is_connected()
885    }
886}
887
888impl WsFederationApi<WsClient> {
889    /// Creates a new API client
890    pub fn new(
891        connector: &Connector,
892        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
893        api_secret: &Option<String>,
894    ) -> Self {
895        Self::new_with_client(connector, peers, None, api_secret)
896    }
897
898    pub fn new_admin(
899        peer: PeerId,
900        url: SafeUrl,
901        api_secret: &Option<String>,
902        connector: &Connector,
903    ) -> Self {
904        WsFederationApi::new(connector, vec![(peer, url)], api_secret).with_self_peer_id(peer)
905    }
906
907    pub fn from_endpoints(
908        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
909        api_secret: &Option<String>,
910        connector: &Connector,
911    ) -> Self {
912        WsFederationApi::new(connector, peers, api_secret)
913    }
914
915    pub fn with_self_peer_id(self, self_peer_id: PeerId) -> Self {
916        Self {
917            self_peer_id: Some(self_peer_id),
918            ..self
919        }
920    }
921}
922
923impl<C> WsFederationApi<C>
924where
925    C: JsonRpcClient + 'static,
926{
927    /// Returns the [`PeerId`]'s for the current [`WsFederationApi`]
928    pub fn peers(&self) -> Vec<PeerId> {
929        self.peers.iter().map(|peer| peer.peer_id).collect()
930    }
931
932    /// Creates a new [`WsFederationApi`] client, for given [`Connector`].
933    pub fn new_with_client(
934        connector: &Connector,
935        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
936        self_peer_id: Option<PeerId>,
937        api_secret: &Option<String>,
938    ) -> Self {
939        let (peer_connections, peer_ids) = peers
940            .into_iter()
941            .map(|(peer_id, url)| {
942                assert!(
943                    url.port_or_known_default().is_some(),
944                    "API client requires a port"
945                );
946                assert!(url.host().is_some(), "API client requires a target host");
947
948                (
949                    FederationPeer::new(*connector, url, peer_id, api_secret.clone()),
950                    peer_id,
951                )
952            })
953            .unzip();
954
955        WsFederationApi {
956            peer_ids,
957            self_peer_id,
958            peers: Arc::new(peer_connections),
959            module_id: None,
960        }
961    }
962}
963
964#[derive(Debug)]
965pub struct PeerResponse<R> {
966    pub peer: PeerId,
967    pub result: JsonRpcResult<R>,
968}
969
970impl<C> FederationPeer<C>
971where
972    C: JsonRpcClient + 'static,
973{
974    #[instrument(level = "trace", fields(peer = %self.peer_id, %method), skip_all)]
975    pub async fn request(&self, method: &str, params: &[Value]) -> JsonRpcResult<Value> {
976        // Strategies using timeouts often depend on failing requests returning quickly,
977        // so every request gets only one reconnection attempt.
978        const RETRIES: usize = 1;
979
980        for attempts in 0.. {
981            // The `match` statement below should always return if `RETRIES <= attempts`, so
982            // if we're looping again and `attempts` is greater than `RETRIES`, we have a
983            // bug.
984            debug_assert!(attempts <= RETRIES);
985
986            let rclient = self.client.read().await;
987            match rclient.client.get_try().await {
988                Ok(client) if client.is_connected() => {
989                    return client.request::<_, _>(method, params).await;
990                }
991                Err(e) => {
992                    if RETRIES <= attempts {
993                        return Err(JsonRpcClientError::Transport(e.into()));
994                    }
995                    debug!(target: LOG_CLIENT_NET_API, err=%e, "Triggering reconnection after connection error");
996                }
997                Ok(_client) => {
998                    if RETRIES <= attempts {
999                        return Err(JsonRpcClientError::Transport(
1000                            anyhow::format_err!("Disconnected").into(),
1001                        ));
1002                    }
1003                    debug!(target: LOG_CLIENT_NET_API, "Triggering reconnection after disconnection");
1004                }
1005            };
1006
1007            // Drop read lock so we can take the write lock, which is needed to reconnect.
1008            drop(rclient);
1009            let mut wclient = self.client.write().await;
1010            match wclient.client.get_try().await {
1011                Ok(client) if client.is_connected() => {
1012                    // someone else connected, just loop again
1013                    trace!(target: LOG_CLIENT_NET_API, "Some other request reconnected client, retrying");
1014                }
1015                _ => {
1016                    wclient.reconnect(
1017                        self.connector,
1018                        self.peer_id,
1019                        self.url.clone(),
1020                        self.api_secret.clone(),
1021                    );
1022                }
1023            }
1024        }
1025
1026        unreachable!();
1027    }
1028}
1029
1030/// The status of a server, including how it views its peers
1031#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1032pub struct FederationStatus {
1033    pub session_count: u64,
1034    pub status_by_peer: HashMap<PeerId, PeerStatus>,
1035    pub peers_online: u64,
1036    pub peers_offline: u64,
1037    /// This should always be 0 if everything is okay, so a monitoring tool
1038    /// should generate an alert if this is not the case.
1039    pub peers_flagged: u64,
1040    pub scheduled_shutdown: Option<u64>,
1041}
1042
1043#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1044pub struct PeerStatus {
1045    pub last_contribution: Option<u64>,
1046    pub connection_status: PeerConnectionStatus,
1047    /// Indicates that this peer needs attention from the operator since
1048    /// it has not contributed to the consensus in a long time
1049    pub flagged: bool,
1050}
1051
1052#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1053#[serde(rename_all = "snake_case")]
1054pub enum PeerConnectionStatus {
1055    #[default]
1056    Disconnected,
1057    Connected,
1058}
1059
1060#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1061pub struct StatusResponse {
1062    pub server: ServerStatus,
1063    pub federation: Option<FederationStatus>,
1064}
1065
1066/// Archive of all the guardian config files that can be used to recover a lost
1067/// guardian node.
1068#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1069pub struct GuardianConfigBackup {
1070    #[serde(with = "fedimint_core::hex::serde")]
1071    pub tar_archive_bytes: Vec<u8>,
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076    use std::fmt;
1077    use std::str::FromStr as _;
1078
1079    use fedimint_core::config::FederationId;
1080    use jsonrpsee_core::client::BatchResponse;
1081    use jsonrpsee_core::params::BatchRequestBuilder;
1082    use jsonrpsee_core::traits::ToRpcParams;
1083
1084    use super::*;
1085
1086    type Result<T = ()> = std::result::Result<T, JsonRpcClientError>;
1087
1088    #[apply(async_trait_maybe_send!)]
1089    trait SimpleClient: Sized {
1090        async fn connect() -> Result<Self>;
1091
1092        #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1093        async fn connect_with_tor() -> Result<Self>;
1094
1095        fn is_connected(&self) -> bool {
1096            true
1097        }
1098
1099        // reply with json
1100        async fn request(&self, method: &str) -> Result<String>;
1101    }
1102
1103    struct Client<C: SimpleClient>(C);
1104
1105    #[apply(async_trait_maybe_send!)]
1106    impl<C: SimpleClient + MaybeSend + MaybeSync> JsonRpcClient for Client<C> {
1107        async fn connect(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
1108            Ok(Self(C::connect().await?))
1109        }
1110
1111        #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1112        async fn connect_with_tor(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
1113            Ok(Self(C::connect_with_tor().await?))
1114        }
1115
1116        fn is_connected(&self) -> bool {
1117            self.0.is_connected()
1118        }
1119    }
1120
1121    #[apply(async_trait_maybe_send!)]
1122    impl<C: SimpleClient + MaybeSend + MaybeSync> ClientT for Client<C> {
1123        async fn request<R, P>(&self, method: &str, _params: P) -> Result<R>
1124        where
1125            R: jsonrpsee_core::DeserializeOwned,
1126            P: ToRpcParams + MaybeSend,
1127        {
1128            let json = self.0.request(method).await?;
1129            Ok(serde_json::from_str(&json).unwrap())
1130        }
1131
1132        async fn notification<P>(&self, _method: &str, _params: P) -> Result<()>
1133        where
1134            P: ToRpcParams + MaybeSend,
1135        {
1136            unimplemented!()
1137        }
1138
1139        async fn batch_request<'a, R>(
1140            &self,
1141            _batch: BatchRequestBuilder<'a>,
1142        ) -> std::result::Result<BatchResponse<'a, R>, jsonrpsee_core::client::Error>
1143        where
1144            R: DeserializeOwned + fmt::Debug + 'a,
1145        {
1146            unimplemented!()
1147        }
1148    }
1149
1150    #[test]
1151    fn converts_invite_code() {
1152        let connect = InviteCode::new(
1153            "ws://test1".parse().unwrap(),
1154            PeerId::from(1),
1155            FederationId::dummy(),
1156            Some("api_secret".into()),
1157        );
1158
1159        let bech32 = connect.to_string();
1160        let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1161        assert_eq!(connect, connect_parsed);
1162
1163        let json = serde_json::to_string(&connect).unwrap();
1164        let connect_as_string: String = serde_json::from_str(&json).unwrap();
1165        assert_eq!(connect_as_string, bech32);
1166        let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1167        assert_eq!(connect_parsed_json, connect_parsed);
1168    }
1169
1170    #[test]
1171    fn creates_essential_guardians_invite_code() {
1172        let mut peer_to_url_map = BTreeMap::new();
1173        peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1174        peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1175        peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1176        peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1177        let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1178
1179        let code =
1180            InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1181
1182        assert_eq!(FederationId::dummy(), code.federation_id());
1183
1184        let expected_map: BTreeMap<PeerId, SafeUrl> =
1185            peer_to_url_map.into_iter().take(max_size).collect();
1186        assert_eq!(expected_map, code.peers());
1187    }
1188}