1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::fmt::Debug;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6use std::{cmp, result};
7
8use anyhow::anyhow;
9#[cfg(all(feature = "tor", not(target_family = "wasm")))]
10use arti_client::{TorAddr, TorClient, TorClientConfig};
11use base64::Engine as _;
12use bitcoin::hashes::sha256;
13use bitcoin::secp256k1;
14pub use error::{FederationError, OutputOutcomeError, PeerError};
15use fedimint_core::admin_client::{
16 ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
17 ServerStatus,
18};
19use fedimint_core::backup::ClientBackupSnapshot;
20use fedimint_core::core::backup::SignedBackupRequest;
21use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::endpoint_constants::AWAIT_OUTPUT_OUTCOME_ENDPOINT;
24use fedimint_core::fmt_utils::AbbreviateDebug;
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding};
29use fedimint_core::net::api_announcement::SignedApiAnnouncement;
30use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
31use fedimint_core::task::{MaybeSend, MaybeSync};
32use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
33use fedimint_core::util::SafeUrl;
34use fedimint_core::{
35 apply, async_trait_maybe_send, dyn_newtype_define, runtime, NumPeersExt, OutPoint, PeerId,
36 TransactionId,
37};
38use fedimint_logging::LOG_CLIENT_NET_API;
39use futures::stream::FuturesUnordered;
40use futures::{Future, StreamExt};
41use itertools::Itertools;
42use jsonrpsee_core::client::ClientT;
43pub use jsonrpsee_core::client::Error as JsonRpcClientError;
44use jsonrpsee_core::DeserializeOwned;
45#[cfg(target_family = "wasm")]
46use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
47#[cfg(not(target_family = "wasm"))]
48use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
49#[cfg(not(target_family = "wasm"))]
50use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
51use net::Connector;
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54#[cfg(not(target_family = "wasm"))]
55use tokio_rustls::rustls::RootCertStore;
56#[cfg(all(feature = "tor", not(target_family = "wasm")))]
57use tokio_rustls::{rustls::ClientConfig as TlsClientConfig, TlsConnector};
58use tracing::{debug, instrument, trace};
59
60use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
61mod error;
62mod global_api;
63pub mod net;
64mod peer;
65
66pub use global_api::{GlobalFederationApiWithCache, GlobalFederationApiWithCacheExt};
67use peer::FederationPeer;
68
69pub type PeerResult<T> = Result<T, PeerError>;
70pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
71pub type FederationResult<T> = Result<T, FederationError>;
72pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
73
74pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
75
76#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
80pub struct ApiVersionSet {
81 pub core: ApiVersion,
82 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
83}
84
85#[apply(async_trait_maybe_send!)]
87pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
88 fn all_peers(&self) -> &BTreeSet<PeerId>;
96
97 fn self_peer(&self) -> Option<PeerId>;
102
103 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
104
105 async fn request_raw(
107 &self,
108 peer_id: PeerId,
109 method: &str,
110 params: &[Value],
111 ) -> result::Result<Value, JsonRpcClientError>;
112}
113
114#[apply(async_trait_maybe_send!)]
117pub trait FederationApiExt: IRawFederationApi {
118 async fn request_single_peer(
121 &self,
122 timeout: Option<Duration>,
123 method: String,
124 params: ApiRequestErased,
125 peer_id: PeerId,
126 ) -> JsonRpcResult<jsonrpsee_core::JsonValue> {
127 let request = async {
128 self.request_raw(peer_id, &method, &[params.to_json()])
129 .await
130 };
131
132 if let Some(timeout) = timeout {
133 match fedimint_core::runtime::timeout(timeout, request).await {
134 Ok(result) => result,
135 Err(_timeout) => Err(JsonRpcClientError::RequestTimeout),
136 }
137 } else {
138 request.await
139 }
140 }
141 async fn request_single_peer_typed<Ret>(
142 &self,
143 timeout: Option<Duration>,
144 method: String,
145 params: ApiRequestErased,
146 peer_id: PeerId,
147 ) -> PeerResult<Ret>
148 where
149 Ret: DeserializeOwned,
150 {
151 self.request_single_peer(timeout, method, params, peer_id)
152 .await
153 .map_err(PeerError::Rpc)
154 .and_then(|v| {
155 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
156 })
157 }
158
159 async fn request_single_peer_federation<FedRet>(
162 &self,
163 timeout: Option<Duration>,
164 method: String,
165 params: ApiRequestErased,
166 peer_id: PeerId,
167 ) -> FederationResult<FedRet>
168 where
169 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
170 {
171 Ok(self
172 .request_single_peer(timeout, method.clone(), params.clone(), peer_id)
173 .await
174 .map_err(PeerError::Rpc)
175 .and_then(|v| {
176 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
177 })
178 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))?)
179 }
180
181 async fn request_with_strategy<PeerRet: serde::de::DeserializeOwned, FedRet: Debug>(
184 &self,
185 mut strategy: impl QueryStrategy<PeerRet, FedRet> + MaybeSend,
186 method: String,
187 params: ApiRequestErased,
188 ) -> FederationResult<FedRet> {
189 #[cfg(not(target_family = "wasm"))]
193 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
194 #[cfg(target_family = "wasm")]
195 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
196
197 let peers = self.all_peers();
198
199 for peer_id in peers {
200 futures.push(Box::pin(async {
201 let request = async {
202 self.request_raw(*peer_id, &method, &[params.to_json()])
203 .await
204 .map(AbbreviateDebug)
205 };
206
207 PeerResponse {
208 peer: *peer_id,
209 result: request.await,
210 }
211 }));
212 }
213
214 let mut peer_delay_ms = BTreeMap::new();
215
216 let max_delay_ms = 1000;
219 loop {
220 let response = futures.next().await;
221 trace!(target: LOG_CLIENT_NET_API, ?response, method, params = ?AbbreviateDebug(params.to_json()), "Received peer response");
222 match response {
223 Some(PeerResponse { peer, result }) => {
224 let result: PeerResult<PeerRet> =
225 result.map_err(PeerError::Rpc).and_then(|o| {
226 serde_json::from_value::<PeerRet>(o.0)
227 .map_err(|e| PeerError::ResponseDeserialization(e.into()))
228 });
229
230 let strategy_step = strategy.process(peer, result);
231 trace!(
232 target: LOG_CLIENT_NET_API,
233 method,
234 ?params,
235 ?strategy_step,
236 "Taking strategy step to the response after peer response"
237 );
238 match strategy_step {
239 QueryStep::Retry(peers) => {
240 for retry_peer in peers {
241 let mut delay_ms =
242 peer_delay_ms.get(&retry_peer).copied().unwrap_or(10);
243 delay_ms = cmp::min(max_delay_ms, delay_ms * 2);
244 peer_delay_ms.insert(retry_peer, delay_ms);
245
246 futures.push(Box::pin({
247 let method = &method;
248 let params = ¶ms;
249 async move {
250 runtime::sleep(Duration::from_millis(delay_ms)).await;
253 PeerResponse {
254 peer: retry_peer,
255 result: self
256 .request_raw(
257 retry_peer,
258 method,
259 &[params.to_json()],
260 )
261 .await
262 .map(AbbreviateDebug),
263 }
264 }
265 }));
266 }
267 }
268 QueryStep::Continue => {}
269 QueryStep::Failure { general, peers } => {
270 return Err(FederationError {
271 method: method.clone(),
272 params: params.params.clone(),
273 general,
274 peers,
275 })
276 }
277 QueryStep::Success(response) => return Ok(response),
278 }
279 }
280 None => {
281 panic!("Query strategy ran out of peers to query without returning a result");
282 }
283 }
284 }
285 }
286
287 async fn request_current_consensus<Ret>(
288 &self,
289 method: String,
290 params: ApiRequestErased,
291 ) -> FederationResult<Ret>
292 where
293 Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
294 {
295 self.request_with_strategy(
296 ThresholdConsensus::new(self.all_peers().to_num_peers()),
297 method,
298 params,
299 )
300 .await
301 }
302
303 async fn request_admin<Ret>(
304 &self,
305 method: &str,
306 params: ApiRequestErased,
307 auth: ApiAuth,
308 ) -> FederationResult<Ret>
309 where
310 Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
311 {
312 let Some(self_peer_id) = self.self_peer() else {
313 return Err(FederationError::general(
314 method,
315 params,
316 anyhow::format_err!("Admin peer_id not set"),
317 ));
318 };
319 self.request_single_peer_federation(
320 None,
321 method.into(),
322 params.with_auth(auth),
323 self_peer_id,
324 )
325 .await
326 }
327
328 async fn request_admin_no_auth<Ret>(
329 &self,
330 method: &str,
331 params: ApiRequestErased,
332 ) -> FederationResult<Ret>
333 where
334 Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
335 {
336 let Some(self_peer_id) = self.self_peer() else {
337 return Err(FederationError::general(
338 method,
339 params,
340 anyhow::format_err!("Admin peer_id not set"),
341 ));
342 };
343
344 self.request_single_peer_federation(None, method.into(), params, self_peer_id)
345 .await
346 }
347}
348
349#[apply(async_trait_maybe_send!)]
350impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
351
352pub trait IModuleFederationApi: IRawFederationApi {}
354
355dyn_newtype_define! {
356 #[derive(Clone)]
357 pub DynModuleApi(Arc<IModuleFederationApi>)
358}
359
360dyn_newtype_define! {
361 #[derive(Clone)]
362 pub DynGlobalApi(Arc<IGlobalFederationApi>)
363}
364
365impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
366 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
367 self.inner.as_ref()
368 }
369}
370
371impl DynGlobalApi {
372 pub fn new_admin(
373 peer: PeerId,
374 url: SafeUrl,
375 api_secret: &Option<String>,
376 connector: &Connector,
377 ) -> DynGlobalApi {
378 GlobalFederationApiWithCache::new(
379 WsFederationApi::new(connector, vec![(peer, url)], api_secret).with_self_peer_id(peer),
380 )
381 .into()
382 }
383
384 pub fn from_pre_peer_id_admin_endpoint(url: SafeUrl, api_secret: &Option<String>) -> Self {
388 let peer_id = PeerId::from(1024);
391 GlobalFederationApiWithCache::new(
392 WsFederationApi::new(&Connector::default(), vec![(peer_id, url)], api_secret)
393 .with_self_peer_id(peer_id),
394 )
395 .into()
396 }
397
398 pub fn from_single_endpoint(
399 peer: PeerId,
400 url: SafeUrl,
401 api_secret: &Option<String>,
402 connector: &Connector,
403 ) -> Self {
404 GlobalFederationApiWithCache::new(WsFederationApi::new(
405 connector,
406 vec![(peer, url)],
407 api_secret,
408 ))
409 .into()
410 }
411
412 pub fn from_endpoints(
413 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
414 api_secret: &Option<String>,
415 connector: &Connector,
416 ) -> Self {
417 GlobalFederationApiWithCache::new(WsFederationApi::new(connector, peers, api_secret)).into()
418 }
419
420 pub fn from_invite_code(connector: &Connector, invite_code: &InviteCode) -> Self {
421 GlobalFederationApiWithCache::new(WsFederationApi::new(
422 connector,
423 invite_code.peers().into_iter().collect_vec(),
424 &invite_code.api_secret(),
425 ))
426 .into()
427 }
428
429 pub async fn await_output_outcome<R>(
430 &self,
431 outpoint: OutPoint,
432 timeout: Duration,
433 module_decoder: &Decoder,
434 ) -> OutputOutcomeResult<R>
435 where
436 R: OutputOutcome,
437 {
438 fedimint_core::runtime::timeout(timeout, async {
439 let outcome: SerdeOutputOutcome = self
440 .inner
441 .request_current_consensus(
442 AWAIT_OUTPUT_OUTCOME_ENDPOINT.to_owned(),
443 ApiRequestErased::new(outpoint),
444 )
445 .await
446 .map_err(OutputOutcomeError::Federation)?;
447
448 deserialize_outcome(&outcome, module_decoder)
449 })
450 .await
451 .map_err(|_| OutputOutcomeError::Timeout(timeout))?
452 }
453}
454
455#[apply(async_trait_maybe_send!)]
457pub trait IGlobalFederationApi: IRawFederationApi {
458 async fn submit_transaction(
459 &self,
460 tx: Transaction,
461 ) -> FederationResult<SerdeModuleEncoding<TransactionSubmissionOutcome>>;
462
463 async fn await_block(
464 &self,
465 block_index: u64,
466 decoders: &ModuleDecoderRegistry,
467 ) -> anyhow::Result<SessionOutcome>;
468
469 async fn get_session_status(
470 &self,
471 block_index: u64,
472 decoders: &ModuleDecoderRegistry,
473 ) -> anyhow::Result<SessionStatus>;
474
475 async fn session_count(&self) -> FederationResult<u64>;
476
477 async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId>;
478
479 async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash>;
481
482 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
483
484 async fn download_backup(
485 &self,
486 id: &secp256k1::PublicKey,
487 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
488
489 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
493
494 async fn set_config_gen_connections(
499 &self,
500 info: ConfigGenConnectionsRequest,
501 auth: ApiAuth,
502 ) -> FederationResult<()>;
503
504 async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
512
513 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
518
519 async fn get_default_config_gen_params(
522 &self,
523 auth: ApiAuth,
524 ) -> FederationResult<ConfigGenParamsRequest>;
525
526 async fn set_config_gen_params(
530 &self,
531 requested: ConfigGenParamsRequest,
532 auth: ApiAuth,
533 ) -> FederationResult<()>;
534
535 async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse>;
539
540 async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
544
545 async fn get_verify_config_hash(
548 &self,
549 auth: ApiAuth,
550 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
551
552 async fn verified_configs(
555 &self,
556 auth: ApiAuth,
557 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
558
559 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
565
566 async fn status(&self) -> FederationResult<StatusResponse>;
568
569 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
571
572 async fn guardian_config_backup(&self, auth: ApiAuth)
574 -> FederationResult<GuardianConfigBackup>;
575
576 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
578
579 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
580
581 async fn submit_api_announcement(
583 &self,
584 peer_id: PeerId,
585 announcement: SignedApiAnnouncement,
586 ) -> FederationResult<()>;
587
588 async fn api_announcements(
589 &self,
590 guardian: PeerId,
591 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
592
593 async fn sign_api_announcement(
594 &self,
595 api_url: SafeUrl,
596 auth: ApiAuth,
597 ) -> FederationResult<SignedApiAnnouncement>;
598
599 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
600
601 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
603}
604
605pub fn deserialize_outcome<R>(
606 outcome: &SerdeOutputOutcome,
607 module_decoder: &Decoder,
608) -> OutputOutcomeResult<R>
609where
610 R: OutputOutcome + MaybeSend,
611{
612 let dyn_outcome = outcome
613 .try_into_inner_known_module_kind(module_decoder)
614 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
615
616 let source_instance = dyn_outcome.module_instance_id();
617
618 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
619 let target_type = std::any::type_name::<R>();
620 OutputOutcomeError::ResponseDeserialization(anyhow!(
621 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
622 ))
623 })
624}
625
626#[derive(Debug, Clone)]
630pub struct WsFederationApi<C = WsClient> {
631 peer_ids: BTreeSet<PeerId>,
632 self_peer_id: Option<PeerId>,
633 peers: Arc<Vec<FederationPeer<C>>>,
634 module_id: Option<ModuleInstanceId>,
635}
636
637impl<C: JsonRpcClient + Debug + 'static> IModuleFederationApi for WsFederationApi<C> {}
638
639#[apply(async_trait_maybe_send!)]
643impl<C: JsonRpcClient + Debug + 'static> IRawFederationApi for WsFederationApi<C> {
644 fn all_peers(&self) -> &BTreeSet<PeerId> {
645 &self.peer_ids
646 }
647
648 fn self_peer(&self) -> Option<PeerId> {
649 self.self_peer_id
650 }
651
652 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
653 WsFederationApi {
654 peer_ids: self.peer_ids.clone(),
655 peers: self.peers.clone(),
656 module_id: Some(id),
657 self_peer_id: self.self_peer_id,
658 }
659 .into()
660 }
661
662 async fn request_raw(
663 &self,
664 peer_id: PeerId,
665 method: &str,
666 params: &[Value],
667 ) -> JsonRpcResult<Value> {
668 let peer = self
669 .peers
670 .iter()
671 .find(|m| m.peer_id == peer_id)
672 .ok_or_else(|| JsonRpcClientError::Custom(format!("Invalid peer_id: {peer_id}")))?;
673
674 let method = match self.module_id {
675 None => method.to_string(),
676 Some(id) => format!("module_{id}_{method}"),
677 };
678 peer.request(&method, params).await
679 }
680}
681
682#[apply(async_trait_maybe_send!)]
683pub trait JsonRpcClient: ClientT + Sized + MaybeSend + MaybeSync {
684 async fn connect(
685 url: &SafeUrl,
686 api_secret: Option<String>,
687 ) -> result::Result<Self, JsonRpcClientError>;
688
689 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
690 async fn connect_with_tor(
691 url: &SafeUrl,
692 api_secret: Option<String>,
693 ) -> result::Result<Self, JsonRpcClientError>;
694
695 fn is_connected(&self) -> bool;
696}
697
698#[apply(async_trait_maybe_send!)]
699impl JsonRpcClient for WsClient {
700 async fn connect(
701 url: &SafeUrl,
702 api_secret: Option<String>,
703 ) -> result::Result<Self, JsonRpcClientError> {
704 #[cfg(not(target_family = "wasm"))]
705 let mut client = {
706 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
707 let mut root_certs = RootCertStore::empty();
708 root_certs.extend(webpki_roots);
709
710 let tls_cfg = CustomCertStore::builder()
711 .with_root_certificates(root_certs)
712 .with_no_client_auth();
713
714 WsClientBuilder::default()
715 .max_concurrent_requests(u16::MAX as usize)
716 .with_custom_cert_store(tls_cfg)
717 };
718
719 #[cfg(target_family = "wasm")]
720 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
721
722 if let Some(api_secret) = api_secret {
723 #[cfg(not(target_family = "wasm"))]
724 {
725 let mut headers = HeaderMap::new();
728
729 let auth = base64::engine::general_purpose::STANDARD
730 .encode(format!("fedimint:{api_secret}"));
731
732 headers.insert(
733 "Authorization",
734 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
735 );
736
737 client = client.set_headers(headers);
738 }
739 #[cfg(target_family = "wasm")]
740 {
741 let mut url = url.clone();
744 url.set_username("fedimint").map_err(|_| {
745 JsonRpcClientError::Transport(anyhow::format_err!("invalid username").into())
746 })?;
747 url.set_password(Some(&api_secret)).map_err(|_| {
748 JsonRpcClientError::Transport(anyhow::format_err!("invalid secret").into())
749 })?;
750 return client.build(url.as_str()).await;
751 }
752 }
753 client.build(url.as_str()).await
754 }
755
756 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
757 async fn connect_with_tor(
758 url: &SafeUrl,
759 api_secret: Option<String>,
760 ) -> result::Result<Self, JsonRpcClientError> {
761 let tor_config = TorClientConfig::default();
762 let tor_client = TorClient::create_bootstrapped(tor_config)
763 .await
764 .map_err(|e| JsonRpcClientError::Transport(e.into()))?
765 .isolated_client();
766
767 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
768
769 let addr = (
772 url.host_str()
773 .expect("It should've asserted for `host` on construction"),
774 url.port_or_known_default()
775 .expect("It should've asserted for `port`, or used a default one, on construction"),
776 );
777 let tor_addr = TorAddr::from(addr).map_err(|e| JsonRpcClientError::Transport(e.into()))?;
778 let tor_addr_clone = tor_addr.clone();
779
780 debug!(
781 ?tor_addr,
782 ?addr,
783 "Successfully created `TorAddr` for given address (i.e. host and port)"
784 );
785
786 let anonymized_stream = if url.is_onion_address() {
789 let mut stream_prefs = arti_client::StreamPrefs::default();
790 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
791
792 let anonymized_stream = tor_client
793 .connect_with_prefs(tor_addr, &stream_prefs)
794 .await
795 .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
796
797 debug!(
798 ?tor_addr_clone,
799 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
800 );
801 anonymized_stream
802 } else {
803 let anonymized_stream = tor_client
804 .connect(tor_addr)
805 .await
806 .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
807
808 debug!(?tor_addr_clone, "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`");
809 anonymized_stream
810 };
811
812 let is_tls = match url.scheme() {
813 "wss" => true,
814 "ws" => false,
815 unexpected_scheme => {
816 let error =
817 format!("`{unexpected_scheme}` not supported, it's expected `ws` or `wss`!");
818 return Err(JsonRpcClientError::Transport(anyhow!(error).into()));
819 }
820 };
821
822 let tls_connector = if is_tls {
823 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
824 let mut root_certs = RootCertStore::empty();
825 root_certs.extend(webpki_roots);
826
827 let tls_config = TlsClientConfig::builder()
828 .with_root_certificates(root_certs)
829 .with_no_client_auth();
830 let tls_connector = TlsConnector::from(Arc::new(tls_config));
831 Some(tls_connector)
832 } else {
833 None
834 };
835
836 let mut ws_client_builder =
837 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
838
839 if let Some(api_secret) = api_secret {
840 let mut headers = HeaderMap::new();
843
844 let auth =
845 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
846
847 headers.insert(
848 "Authorization",
849 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
850 );
851
852 ws_client_builder = ws_client_builder.set_headers(headers);
853 }
854
855 match tls_connector {
856 None => {
857 return ws_client_builder
858 .build_with_stream(url.as_str(), anonymized_stream)
859 .await;
860 }
861 Some(tls_connector) => {
862 let host = url.host_str().map(ToOwned::to_owned).ok_or_else(|| {
863 JsonRpcClientError::Transport(anyhow!("Invalid host!").into())
864 })?;
865
866 let server_name = rustls_pki_types::ServerName::try_from(host)
869 .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
870
871 let anonymized_tls_stream = tls_connector
872 .connect(server_name, anonymized_stream)
873 .await
874 .map_err(|e| JsonRpcClientError::Transport(e.into()))?;
875
876 return ws_client_builder
877 .build_with_stream(url.as_str(), anonymized_tls_stream)
878 .await;
879 }
880 }
881 }
882
883 fn is_connected(&self) -> bool {
884 self.is_connected()
885 }
886}
887
888impl WsFederationApi<WsClient> {
889 pub fn new(
891 connector: &Connector,
892 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
893 api_secret: &Option<String>,
894 ) -> Self {
895 Self::new_with_client(connector, peers, None, api_secret)
896 }
897
898 pub fn new_admin(
899 peer: PeerId,
900 url: SafeUrl,
901 api_secret: &Option<String>,
902 connector: &Connector,
903 ) -> Self {
904 WsFederationApi::new(connector, vec![(peer, url)], api_secret).with_self_peer_id(peer)
905 }
906
907 pub fn from_endpoints(
908 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
909 api_secret: &Option<String>,
910 connector: &Connector,
911 ) -> Self {
912 WsFederationApi::new(connector, peers, api_secret)
913 }
914
915 pub fn with_self_peer_id(self, self_peer_id: PeerId) -> Self {
916 Self {
917 self_peer_id: Some(self_peer_id),
918 ..self
919 }
920 }
921}
922
923impl<C> WsFederationApi<C>
924where
925 C: JsonRpcClient + 'static,
926{
927 pub fn peers(&self) -> Vec<PeerId> {
929 self.peers.iter().map(|peer| peer.peer_id).collect()
930 }
931
932 pub fn new_with_client(
934 connector: &Connector,
935 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
936 self_peer_id: Option<PeerId>,
937 api_secret: &Option<String>,
938 ) -> Self {
939 let (peer_connections, peer_ids) = peers
940 .into_iter()
941 .map(|(peer_id, url)| {
942 assert!(
943 url.port_or_known_default().is_some(),
944 "API client requires a port"
945 );
946 assert!(url.host().is_some(), "API client requires a target host");
947
948 (
949 FederationPeer::new(*connector, url, peer_id, api_secret.clone()),
950 peer_id,
951 )
952 })
953 .unzip();
954
955 WsFederationApi {
956 peer_ids,
957 self_peer_id,
958 peers: Arc::new(peer_connections),
959 module_id: None,
960 }
961 }
962}
963
964#[derive(Debug)]
965pub struct PeerResponse<R> {
966 pub peer: PeerId,
967 pub result: JsonRpcResult<R>,
968}
969
970impl<C> FederationPeer<C>
971where
972 C: JsonRpcClient + 'static,
973{
974 #[instrument(level = "trace", fields(peer = %self.peer_id, %method), skip_all)]
975 pub async fn request(&self, method: &str, params: &[Value]) -> JsonRpcResult<Value> {
976 const RETRIES: usize = 1;
979
980 for attempts in 0.. {
981 debug_assert!(attempts <= RETRIES);
985
986 let rclient = self.client.read().await;
987 match rclient.client.get_try().await {
988 Ok(client) if client.is_connected() => {
989 return client.request::<_, _>(method, params).await;
990 }
991 Err(e) => {
992 if RETRIES <= attempts {
993 return Err(JsonRpcClientError::Transport(e.into()));
994 }
995 debug!(target: LOG_CLIENT_NET_API, err=%e, "Triggering reconnection after connection error");
996 }
997 Ok(_client) => {
998 if RETRIES <= attempts {
999 return Err(JsonRpcClientError::Transport(
1000 anyhow::format_err!("Disconnected").into(),
1001 ));
1002 }
1003 debug!(target: LOG_CLIENT_NET_API, "Triggering reconnection after disconnection");
1004 }
1005 };
1006
1007 drop(rclient);
1009 let mut wclient = self.client.write().await;
1010 match wclient.client.get_try().await {
1011 Ok(client) if client.is_connected() => {
1012 trace!(target: LOG_CLIENT_NET_API, "Some other request reconnected client, retrying");
1014 }
1015 _ => {
1016 wclient.reconnect(
1017 self.connector,
1018 self.peer_id,
1019 self.url.clone(),
1020 self.api_secret.clone(),
1021 );
1022 }
1023 }
1024 }
1025
1026 unreachable!();
1027 }
1028}
1029
1030#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1032pub struct FederationStatus {
1033 pub session_count: u64,
1034 pub status_by_peer: HashMap<PeerId, PeerStatus>,
1035 pub peers_online: u64,
1036 pub peers_offline: u64,
1037 pub peers_flagged: u64,
1040 pub scheduled_shutdown: Option<u64>,
1041}
1042
1043#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1044pub struct PeerStatus {
1045 pub last_contribution: Option<u64>,
1046 pub connection_status: PeerConnectionStatus,
1047 pub flagged: bool,
1050}
1051
1052#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1053#[serde(rename_all = "snake_case")]
1054pub enum PeerConnectionStatus {
1055 #[default]
1056 Disconnected,
1057 Connected,
1058}
1059
1060#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1061pub struct StatusResponse {
1062 pub server: ServerStatus,
1063 pub federation: Option<FederationStatus>,
1064}
1065
1066#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1069pub struct GuardianConfigBackup {
1070 #[serde(with = "fedimint_core::hex::serde")]
1071 pub tar_archive_bytes: Vec<u8>,
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076 use std::fmt;
1077 use std::str::FromStr as _;
1078
1079 use fedimint_core::config::FederationId;
1080 use jsonrpsee_core::client::BatchResponse;
1081 use jsonrpsee_core::params::BatchRequestBuilder;
1082 use jsonrpsee_core::traits::ToRpcParams;
1083
1084 use super::*;
1085
1086 type Result<T = ()> = std::result::Result<T, JsonRpcClientError>;
1087
1088 #[apply(async_trait_maybe_send!)]
1089 trait SimpleClient: Sized {
1090 async fn connect() -> Result<Self>;
1091
1092 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1093 async fn connect_with_tor() -> Result<Self>;
1094
1095 fn is_connected(&self) -> bool {
1096 true
1097 }
1098
1099 async fn request(&self, method: &str) -> Result<String>;
1101 }
1102
1103 struct Client<C: SimpleClient>(C);
1104
1105 #[apply(async_trait_maybe_send!)]
1106 impl<C: SimpleClient + MaybeSend + MaybeSync> JsonRpcClient for Client<C> {
1107 async fn connect(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
1108 Ok(Self(C::connect().await?))
1109 }
1110
1111 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1112 async fn connect_with_tor(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
1113 Ok(Self(C::connect_with_tor().await?))
1114 }
1115
1116 fn is_connected(&self) -> bool {
1117 self.0.is_connected()
1118 }
1119 }
1120
1121 #[apply(async_trait_maybe_send!)]
1122 impl<C: SimpleClient + MaybeSend + MaybeSync> ClientT for Client<C> {
1123 async fn request<R, P>(&self, method: &str, _params: P) -> Result<R>
1124 where
1125 R: jsonrpsee_core::DeserializeOwned,
1126 P: ToRpcParams + MaybeSend,
1127 {
1128 let json = self.0.request(method).await?;
1129 Ok(serde_json::from_str(&json).unwrap())
1130 }
1131
1132 async fn notification<P>(&self, _method: &str, _params: P) -> Result<()>
1133 where
1134 P: ToRpcParams + MaybeSend,
1135 {
1136 unimplemented!()
1137 }
1138
1139 async fn batch_request<'a, R>(
1140 &self,
1141 _batch: BatchRequestBuilder<'a>,
1142 ) -> std::result::Result<BatchResponse<'a, R>, jsonrpsee_core::client::Error>
1143 where
1144 R: DeserializeOwned + fmt::Debug + 'a,
1145 {
1146 unimplemented!()
1147 }
1148 }
1149
1150 #[test]
1151 fn converts_invite_code() {
1152 let connect = InviteCode::new(
1153 "ws://test1".parse().unwrap(),
1154 PeerId::from(1),
1155 FederationId::dummy(),
1156 Some("api_secret".into()),
1157 );
1158
1159 let bech32 = connect.to_string();
1160 let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1161 assert_eq!(connect, connect_parsed);
1162
1163 let json = serde_json::to_string(&connect).unwrap();
1164 let connect_as_string: String = serde_json::from_str(&json).unwrap();
1165 assert_eq!(connect_as_string, bech32);
1166 let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1167 assert_eq!(connect_parsed_json, connect_parsed);
1168 }
1169
1170 #[test]
1171 fn creates_essential_guardians_invite_code() {
1172 let mut peer_to_url_map = BTreeMap::new();
1173 peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1174 peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1175 peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1176 peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1177 let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1178
1179 let code =
1180 InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1181
1182 assert_eq!(FederationId::dummy(), code.federation_id());
1183
1184 let expected_map: BTreeMap<PeerId, SafeUrl> =
1185 peer_to_url_map.into_iter().take(max_size).collect();
1186 assert_eq!(expected_map, code.peers());
1187 }
1188}