1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::fmt::{self, Debug};
3use std::iter::once;
4use std::pin::Pin;
5use std::result;
6use std::sync::Arc;
7
8use anyhow::{anyhow, Context};
9#[cfg(all(feature = "tor", not(target_family = "wasm")))]
10use arti_client::{TorAddr, TorClient, TorClientConfig};
11use async_channel::bounded;
12use async_trait::async_trait;
13use base64::Engine as _;
14use bitcoin::hashes::sha256;
15use bitcoin::secp256k1;
16pub use error::{FederationError, OutputOutcomeError, PeerError};
17use fedimint_core::admin_client::{
18 ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
19 ServerStatus,
20};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding};
28use fedimint_core::net::api_announcement::SignedApiAnnouncement;
29use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
30use fedimint_core::task::{MaybeSend, MaybeSync};
31use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
32use fedimint_core::util::backoff_util::api_networking_backoff;
33use fedimint_core::util::{FmtCompact as _, SafeUrl};
34use fedimint_core::{
35 apply, async_trait_maybe_send, dyn_newtype_define, util, NumPeersExt, PeerId, TransactionId,
36};
37use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API};
38use futures::channel::oneshot;
39use futures::future::pending;
40use futures::stream::FuturesUnordered;
41use futures::{Future, StreamExt};
42use jsonrpsee_core::client::ClientT;
43pub use jsonrpsee_core::client::Error as JsonRpcClientError;
44use jsonrpsee_core::DeserializeOwned;
45use jsonrpsee_types::ErrorCode;
46#[cfg(target_family = "wasm")]
47use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
48#[cfg(not(target_family = "wasm"))]
49use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
50#[cfg(not(target_family = "wasm"))]
51use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
52use net::Connector;
53use serde::{Deserialize, Serialize};
54use serde_json::Value;
55#[cfg(not(target_family = "wasm"))]
56use tokio_rustls::rustls::RootCertStore;
57#[cfg(all(feature = "tor", not(target_family = "wasm")))]
58use tokio_rustls::{rustls::ClientConfig as TlsClientConfig, TlsConnector};
59use tracing::{debug, info, instrument, trace, trace_span, warn, Instrument};
60
61use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
62mod error;
63mod global_api;
64pub mod net;
65
66pub use global_api::{GlobalFederationApiWithCache, GlobalFederationApiWithCacheExt};
67
68pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
69
70pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
71 ApiVersion { major: 0, minor: 1 };
72
73pub type PeerResult<T> = Result<T, PeerError>;
74pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
75pub type FederationResult<T> = Result<T, FederationError>;
76pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
77
78pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
79
80#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
84pub struct ApiVersionSet {
85 pub core: ApiVersion,
86 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
87}
88
89#[apply(async_trait_maybe_send!)]
91pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
92 fn all_peers(&self) -> &BTreeSet<PeerId>;
100
101 fn self_peer(&self) -> Option<PeerId>;
106
107 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
108
109 async fn request_raw(
111 &self,
112 peer_id: PeerId,
113 method: &str,
114 params: &ApiRequestErased,
115 ) -> PeerResult<Value>;
116}
117
118#[apply(async_trait_maybe_send!)]
121pub trait FederationApiExt: IRawFederationApi {
122 async fn request_single_peer<Ret>(
123 &self,
124 method: String,
125 params: ApiRequestErased,
126 peer: PeerId,
127 ) -> PeerResult<Ret>
128 where
129 Ret: DeserializeOwned,
130 {
131 self.request_raw(peer, &method, ¶ms)
132 .await
133 .and_then(|v| {
134 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
135 })
136 }
137
138 async fn request_single_peer_federation<FedRet>(
139 &self,
140 method: String,
141 params: ApiRequestErased,
142 peer_id: PeerId,
143 ) -> FederationResult<FedRet>
144 where
145 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
146 {
147 self.request_raw(peer_id, &method, ¶ms)
148 .await
149 .and_then(|v| {
150 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
151 })
152 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
153 }
154
155 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
158 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
159 &self,
160 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
161 method: String,
162 params: ApiRequestErased,
163 ) -> FederationResult<FR> {
164 #[cfg(not(target_family = "wasm"))]
168 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
169 #[cfg(target_family = "wasm")]
170 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
171
172 for peer in self.all_peers() {
173 futures.push(Box::pin({
174 let method = &method;
175 let params = ¶ms;
176 async move {
177 let result = self
178 .request_single_peer(method.clone(), params.clone(), *peer)
179 .await;
180
181 (*peer, result)
182 }
183 }));
184 }
185
186 let mut peer_errors = BTreeMap::new();
187 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
188
189 loop {
190 let (peer, result) = futures
191 .next()
192 .await
193 .expect("Query strategy ran out of peers to query without returning a result");
194
195 match result {
196 Ok(response) => match strategy.process(peer, response) {
197 QueryStep::Retry(peers) => {
198 for peer in peers {
199 futures.push(Box::pin({
200 let method = &method;
201 let params = ¶ms;
202 async move {
203 let result = self
204 .request_single_peer(method.clone(), params.clone(), peer)
205 .await;
206
207 (peer, result)
208 }
209 }));
210 }
211 }
212 QueryStep::Success(response) => return Ok(response),
213 QueryStep::Failure(e) => {
214 peer_errors.insert(peer, e);
215 }
216 QueryStep::Continue => {}
217 },
218 Err(e) => {
219 e.report_if_unusual(peer, "RequestWithStrategy");
220 peer_errors.insert(peer, e);
221 }
222 }
223
224 if peer_errors.len() == peer_error_threshold {
225 return Err(FederationError::peer_errors(
226 method.clone(),
227 params.params.clone(),
228 peer_errors,
229 ));
230 }
231 }
232 }
233
234 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
235 &self,
236 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
237 method: String,
238 params: ApiRequestErased,
239 ) -> FR {
240 #[cfg(not(target_family = "wasm"))]
244 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
245 #[cfg(target_family = "wasm")]
246 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
247
248 for peer in self.all_peers() {
249 futures.push(Box::pin({
250 let method = &method;
251 let params = ¶ms;
252 async move {
253 let response = util::retry(
254 "api-request-{method}-{peer}",
255 api_networking_backoff(),
256 || async {
257 self.request_single_peer(method.clone(), params.clone(), *peer)
258 .await
259 .inspect_err(|e| {
260 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
261 })
262 .map_err(|e| anyhow!(e.to_string()))
263 },
264 )
265 .await
266 .expect("Number of retries has no limit");
267
268 (*peer, response)
269 }
270 }));
271 }
272
273 loop {
274 let (peer, response) = match futures.next().await {
275 Some(t) => t,
276 None => pending().await,
277 };
278
279 match strategy.process(peer, response) {
280 QueryStep::Retry(peers) => {
281 for peer in peers {
282 futures.push(Box::pin({
283 let method = &method;
284 let params = ¶ms;
285 async move {
286 let response = util::retry(
287 "api-request-{method}-{peer}",
288 api_networking_backoff(),
289 || async {
290 self.request_single_peer(
291 method.clone(),
292 params.clone(),
293 peer,
294 )
295 .await
296 .inspect_err(|err| {
297 if err.is_unusual() {
298 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
299 }
300 })
301 .map_err(|e| anyhow!(e.to_string()))
302 },
303 )
304 .await
305 .expect("Number of retries has no limit");
306
307 (peer, response)
308 }
309 }));
310 }
311 }
312 QueryStep::Success(response) => return response,
313 QueryStep::Failure(e) => {
314 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
315 }
316 QueryStep::Continue => {}
317 }
318 }
319 }
320
321 async fn request_current_consensus<Ret>(
322 &self,
323 method: String,
324 params: ApiRequestErased,
325 ) -> FederationResult<Ret>
326 where
327 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
328 {
329 self.request_with_strategy(
330 ThresholdConsensus::new(self.all_peers().to_num_peers()),
331 method,
332 params,
333 )
334 .await
335 }
336
337 async fn request_current_consensus_retry<Ret>(
338 &self,
339 method: String,
340 params: ApiRequestErased,
341 ) -> Ret
342 where
343 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
344 {
345 self.request_with_strategy_retry(
346 ThresholdConsensus::new(self.all_peers().to_num_peers()),
347 method,
348 params,
349 )
350 .await
351 }
352
353 async fn request_admin<Ret>(
354 &self,
355 method: &str,
356 params: ApiRequestErased,
357 auth: ApiAuth,
358 ) -> FederationResult<Ret>
359 where
360 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
361 {
362 let Some(self_peer_id) = self.self_peer() else {
363 return Err(FederationError::general(
364 method,
365 params,
366 anyhow::format_err!("Admin peer_id not set"),
367 ));
368 };
369
370 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
371 .await
372 }
373
374 async fn request_admin_no_auth<Ret>(
375 &self,
376 method: &str,
377 params: ApiRequestErased,
378 ) -> FederationResult<Ret>
379 where
380 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
381 {
382 let Some(self_peer_id) = self.self_peer() else {
383 return Err(FederationError::general(
384 method,
385 params,
386 anyhow::format_err!("Admin peer_id not set"),
387 ));
388 };
389
390 self.request_single_peer_federation(method.into(), params, self_peer_id)
391 .await
392 }
393}
394
395#[apply(async_trait_maybe_send!)]
396impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
397
398pub trait IModuleFederationApi: IRawFederationApi {}
400
401dyn_newtype_define! {
402 #[derive(Clone)]
403 pub DynModuleApi(Arc<IModuleFederationApi>)
404}
405
406dyn_newtype_define! {
407 #[derive(Clone)]
408 pub DynGlobalApi(Arc<IGlobalFederationApi>)
409}
410
411impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
412 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
413 self.inner.as_ref()
414 }
415}
416
417impl DynGlobalApi {
418 pub fn new_admin(
419 peer: PeerId,
420 url: SafeUrl,
421 api_secret: &Option<String>,
422 connector: &Connector,
423 ) -> DynGlobalApi {
424 GlobalFederationApiWithCache::new(ReconnectFederationApi::from_endpoints(
425 once((peer, url)),
426 api_secret,
427 connector,
428 Some(peer),
429 ))
430 .into()
431 }
432
433 pub fn from_pre_peer_id_admin_endpoint(url: SafeUrl, api_secret: &Option<String>) -> Self {
437 Self::new_admin(PeerId::from(1024), url, api_secret, &Connector::default())
441 }
442
443 pub fn from_endpoints(
444 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
445 api_secret: &Option<String>,
446 connector: &Connector,
447 ) -> Self {
448 GlobalFederationApiWithCache::new(ReconnectFederationApi::from_endpoints(
449 peers, api_secret, connector, None,
450 ))
451 .into()
452 }
453}
454
455#[apply(async_trait_maybe_send!)]
457pub trait IGlobalFederationApi: IRawFederationApi {
458 async fn submit_transaction(
459 &self,
460 tx: Transaction,
461 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
462
463 async fn await_block(
464 &self,
465 block_index: u64,
466 decoders: &ModuleDecoderRegistry,
467 ) -> anyhow::Result<SessionOutcome>;
468
469 async fn get_session_status(
470 &self,
471 block_index: u64,
472 decoders: &ModuleDecoderRegistry,
473 core_api_version: ApiVersion,
474 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
475 ) -> anyhow::Result<SessionStatus>;
476
477 async fn session_count(&self) -> FederationResult<u64>;
478
479 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
480
481 async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash>;
483
484 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
485
486 async fn download_backup(
487 &self,
488 id: &secp256k1::PublicKey,
489 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
490
491 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
495
496 async fn set_config_gen_connections(
501 &self,
502 info: ConfigGenConnectionsRequest,
503 auth: ApiAuth,
504 ) -> FederationResult<()>;
505
506 async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
514
515 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
520
521 async fn get_default_config_gen_params(
524 &self,
525 auth: ApiAuth,
526 ) -> FederationResult<ConfigGenParamsRequest>;
527
528 async fn set_config_gen_params(
532 &self,
533 requested: ConfigGenParamsRequest,
534 auth: ApiAuth,
535 ) -> FederationResult<()>;
536
537 async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse>;
541
542 async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
546
547 async fn get_verify_config_hash(
550 &self,
551 auth: ApiAuth,
552 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
553
554 async fn verified_configs(
557 &self,
558 auth: ApiAuth,
559 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
560
561 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
567
568 async fn status(&self) -> FederationResult<StatusResponse>;
570
571 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
573
574 async fn guardian_config_backup(&self, auth: ApiAuth)
576 -> FederationResult<GuardianConfigBackup>;
577
578 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
580
581 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
582
583 async fn submit_api_announcement(
585 &self,
586 peer_id: PeerId,
587 announcement: SignedApiAnnouncement,
588 ) -> FederationResult<()>;
589
590 async fn api_announcements(
591 &self,
592 guardian: PeerId,
593 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
594
595 async fn sign_api_announcement(
596 &self,
597 api_url: SafeUrl,
598 auth: ApiAuth,
599 ) -> FederationResult<SignedApiAnnouncement>;
600
601 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
602
603 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
605
606 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
608}
609
610pub fn deserialize_outcome<R>(
611 outcome: &SerdeOutputOutcome,
612 module_decoder: &Decoder,
613) -> OutputOutcomeResult<R>
614where
615 R: OutputOutcome + MaybeSend,
616{
617 let dyn_outcome = outcome
618 .try_into_inner_known_module_kind(module_decoder)
619 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
620
621 let source_instance = dyn_outcome.module_instance_id();
622
623 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
624 let target_type = std::any::type_name::<R>();
625 OutputOutcomeError::ResponseDeserialization(anyhow!(
626 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
627 ))
628 })
629}
630
631#[derive(Debug, Clone)]
632pub struct WebsocketConnector {
633 peers: BTreeMap<PeerId, SafeUrl>,
634 api_secret: Option<String>,
635}
636
637impl WebsocketConnector {
638 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
639 Self { peers, api_secret }
640 }
641}
642
643#[async_trait]
644impl IClientConnector for WebsocketConnector {
645 fn peers(&self) -> BTreeSet<PeerId> {
646 self.peers.keys().copied().collect()
647 }
648
649 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
650 let api_endpoint = self
651 .peers
652 .get(&peer_id)
653 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
654
655 #[cfg(not(target_family = "wasm"))]
656 let mut client = {
657 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
658 let mut root_certs = RootCertStore::empty();
659 root_certs.extend(webpki_roots);
660
661 let tls_cfg = CustomCertStore::builder()
662 .with_root_certificates(root_certs)
663 .with_no_client_auth();
664
665 WsClientBuilder::default()
666 .max_concurrent_requests(u16::MAX as usize)
667 .with_custom_cert_store(tls_cfg)
668 };
669
670 #[cfg(target_family = "wasm")]
671 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
672
673 if let Some(api_secret) = &self.api_secret {
674 #[cfg(not(target_family = "wasm"))]
675 {
676 let mut headers = HeaderMap::new();
679
680 let auth = base64::engine::general_purpose::STANDARD
681 .encode(format!("fedimint:{api_secret}"));
682
683 headers.insert(
684 "Authorization",
685 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
686 );
687
688 client = client.set_headers(headers);
689 }
690 #[cfg(target_family = "wasm")]
691 {
692 let mut url = api_endpoint.clone();
695 url.set_username("fedimint")
696 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
697 url.set_password(Some(&api_secret))
698 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
699
700 let client = client
701 .build(url.as_str())
702 .await
703 .map_err(|err| PeerError::InternalClientError(err.into()))?;
704
705 return Ok(client.into_dyn());
706 }
707 }
708
709 let client = client
710 .build(api_endpoint.as_str())
711 .await
712 .map_err(|err| PeerError::InternalClientError(err.into()))?;
713
714 Ok(client.into_dyn())
715 }
716}
717
718#[cfg(all(feature = "tor", not(target_family = "wasm")))]
719#[derive(Debug, Clone)]
720pub struct TorConnector {
721 peers: BTreeMap<PeerId, SafeUrl>,
722 api_secret: Option<String>,
723}
724
725#[cfg(all(feature = "tor", not(target_family = "wasm")))]
726impl TorConnector {
727 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
728 Self { peers, api_secret }
729 }
730}
731
732#[cfg(all(feature = "tor", not(target_family = "wasm")))]
733#[async_trait]
734impl IClientConnector for TorConnector {
735 fn peers(&self) -> BTreeSet<PeerId> {
736 self.peers.keys().copied().collect()
737 }
738
739 #[allow(clippy::too_many_lines)]
740 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
741 let api_endpoint = self
742 .peers
743 .get(&peer_id)
744 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
745
746 let tor_config = TorClientConfig::default();
747 let tor_client = TorClient::create_bootstrapped(tor_config)
748 .await
749 .map_err(|err| PeerError::InternalClientError(err.into()))?
750 .isolated_client();
751
752 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
753
754 let addr = (
757 api_endpoint
758 .host_str()
759 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
760 api_endpoint
761 .port_or_known_default()
762 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
763 );
764 let tor_addr = TorAddr::from(addr).map_err(|e| {
765 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
766 })?;
767
768 let tor_addr_clone = tor_addr.clone();
769
770 debug!(
771 ?tor_addr,
772 ?addr,
773 "Successfully created `TorAddr` for given address (i.e. host and port)"
774 );
775
776 let anonymized_stream = if api_endpoint.is_onion_address() {
779 let mut stream_prefs = arti_client::StreamPrefs::default();
780 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
781
782 let anonymized_stream = tor_client
783 .connect_with_prefs(tor_addr, &stream_prefs)
784 .await
785 .map_err(|e| PeerError::Connection(e.into()))?;
786
787 debug!(
788 ?tor_addr_clone,
789 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
790 );
791 anonymized_stream
792 } else {
793 let anonymized_stream = tor_client
794 .connect(tor_addr)
795 .await
796 .map_err(|e| PeerError::Connection(e.into()))?;
797
798 debug!(?tor_addr_clone, "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`");
799 anonymized_stream
800 };
801
802 let is_tls = match api_endpoint.scheme() {
803 "wss" => true,
804 "ws" => false,
805 unexpected_scheme => {
806 return Err(PeerError::InvalidEndpoint(anyhow!(
807 "Unsupported scheme: {unexpected_scheme}"
808 )));
809 }
810 };
811
812 let tls_connector = if is_tls {
813 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
814 let mut root_certs = RootCertStore::empty();
815 root_certs.extend(webpki_roots);
816
817 let tls_config = TlsClientConfig::builder()
818 .with_root_certificates(root_certs)
819 .with_no_client_auth();
820 let tls_connector = TlsConnector::from(Arc::new(tls_config));
821 Some(tls_connector)
822 } else {
823 None
824 };
825
826 let mut ws_client_builder =
827 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
828
829 if let Some(api_secret) = &self.api_secret {
830 let mut headers = HeaderMap::new();
833
834 let auth =
835 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
836
837 headers.insert(
838 "Authorization",
839 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
840 );
841
842 ws_client_builder = ws_client_builder.set_headers(headers);
843 }
844
845 match tls_connector {
846 None => {
847 let client = ws_client_builder
848 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
849 .await
850 .map_err(|e| PeerError::Connection(e.into()))?;
851
852 Ok(client.into_dyn())
853 }
854 Some(tls_connector) => {
855 let host = api_endpoint
856 .host_str()
857 .map(ToOwned::to_owned)
858 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
859
860 let server_name = rustls_pki_types::ServerName::try_from(host)
863 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
864
865 let anonymized_tls_stream = tls_connector
866 .connect(server_name, anonymized_stream)
867 .await
868 .map_err(|e| PeerError::Connection(e.into()))?;
869
870 let client = ws_client_builder
871 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
872 .await
873 .map_err(|e| PeerError::Connection(e.into()))?;
874
875 Ok(client.into_dyn())
876 }
877 }
878 }
879}
880
881fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
882 match jsonrpc_error {
883 JsonRpcClientError::Call(error_object) => {
884 let error = anyhow!(error_object.message().to_owned());
885 match ErrorCode::from(error_object.code()) {
886 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
887 PeerError::InvalidRequest(error)
888 }
889 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
890 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
891 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
892 PeerError::ServerError(error)
893 }
894 }
895 }
896 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
897 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
898 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
899 JsonRpcClientError::InvalidSubscriptionId => todo!(),
900 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
901 PeerError::InvalidRequest(anyhow!(invalid_request_id))
902 }
903 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
904 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
905 JsonRpcClientError::HttpNotImplemented => {
906 PeerError::ServerError(anyhow!("Http not implemented"))
907 }
908 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
909 PeerError::InvalidRequest(anyhow!(empty_batch_request))
910 }
911 JsonRpcClientError::RegisterMethod(register_method_error) => {
912 PeerError::InvalidResponse(anyhow!(register_method_error))
913 }
914 }
915}
916
917#[async_trait]
918impl IClientConnection for WsClient {
919 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
920 let method = match method {
921 ApiMethod::Core(method) => method,
922 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
923 };
924
925 Ok(ClientT::request(self, &method, [request.to_json()])
926 .await
927 .map_err(jsonrpc_error_to_peer_error)?)
928 }
929
930 async fn await_disconnection(&self) {
931 self.on_disconnect().await;
932 }
933}
934
935#[derive(Debug, Clone, Serialize, Deserialize)]
936pub enum ApiMethod {
937 Core(String),
938 Module(ModuleInstanceId, String),
939}
940
941impl fmt::Display for ApiMethod {
942 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943 match self {
944 ApiMethod::Core(s) => f.write_str(s),
945 ApiMethod::Module(module_id, s) => f.write_fmt(format_args!("{module_id}-{s}")),
946 }
947 }
948}
949
950pub type DynClientConnector = Arc<dyn IClientConnector>;
951
952#[async_trait]
955pub trait IClientConnector: Send + Sync + 'static {
956 fn peers(&self) -> BTreeSet<PeerId>;
957
958 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
959
960 fn into_dyn(self) -> DynClientConnector
961 where
962 Self: Sized,
963 {
964 Arc::new(self)
965 }
966}
967
968pub type DynClientConnection = Arc<dyn IClientConnection>;
969
970#[async_trait]
971pub trait IClientConnection: Debug + Send + Sync + 'static {
972 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
973
974 async fn await_disconnection(&self);
975
976 fn into_dyn(self) -> DynClientConnection
977 where
978 Self: Sized,
979 {
980 Arc::new(self)
981 }
982}
983
984#[derive(Clone, Debug)]
985pub struct ReconnectFederationApi {
986 peers: BTreeSet<PeerId>,
987 admin_id: Option<PeerId>,
988 module_id: Option<ModuleInstanceId>,
989 connections: ReconnectClientConnections,
990}
991
992impl ReconnectFederationApi {
993 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
994 Self {
995 peers: connector.peers(),
996 admin_id,
997 module_id: None,
998 connections: ReconnectClientConnections::new(connector),
999 }
1000 }
1001
1002 pub fn new_admin(
1003 peer: PeerId,
1004 url: SafeUrl,
1005 api_secret: &Option<String>,
1006 connector: &Connector,
1007 ) -> Self {
1008 Self::from_endpoints(once((peer, url)), api_secret, connector, Some(peer))
1009 }
1010
1011 pub fn from_endpoints(
1012 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1013 api_secret: &Option<String>,
1014 connector: &Connector,
1015 admin_id: Option<PeerId>,
1016 ) -> Self {
1017 let connector = match connector {
1018 Connector::Tcp => {
1019 WebsocketConnector::new(peers.into_iter().collect(), api_secret.clone()).into_dyn()
1020 }
1021 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1022 Connector::Tor => {
1023 TorConnector::new(peers.into_iter().collect(), api_secret.clone()).into_dyn()
1024 }
1025 #[cfg(all(feature = "tor", target_family = "wasm"))]
1026 Connector::Tor => unimplemented!(),
1027 };
1028
1029 ReconnectFederationApi::new(&connector, admin_id)
1030 }
1031}
1032
1033impl IModuleFederationApi for ReconnectFederationApi {}
1034
1035#[apply(async_trait_maybe_send!)]
1036impl IRawFederationApi for ReconnectFederationApi {
1037 fn all_peers(&self) -> &BTreeSet<PeerId> {
1038 &self.peers
1039 }
1040
1041 fn self_peer(&self) -> Option<PeerId> {
1042 self.admin_id
1043 }
1044
1045 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1046 ReconnectFederationApi {
1047 peers: self.peers.clone(),
1048 admin_id: self.admin_id,
1049 module_id: Some(id),
1050 connections: self.connections.clone(),
1051 }
1052 .into()
1053 }
1054
1055 async fn request_raw(
1056 &self,
1057 peer_id: PeerId,
1058 method: &str,
1059 params: &ApiRequestErased,
1060 ) -> PeerResult<Value> {
1061 let method = match self.module_id {
1062 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1063 None => ApiMethod::Core(method.to_string()),
1064 };
1065
1066 self.connections
1067 .request(peer_id, method, params.clone())
1068 .await
1069 }
1070}
1071
1072#[derive(Clone, Debug)]
1073pub struct ReconnectClientConnections {
1074 connections: BTreeMap<PeerId, ClientConnection>,
1075}
1076
1077impl ReconnectClientConnections {
1078 pub fn new(connector: &DynClientConnector) -> Self {
1079 ReconnectClientConnections {
1080 connections: connector
1081 .peers()
1082 .into_iter()
1083 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1084 .collect(),
1085 }
1086 }
1087
1088 async fn request(
1089 &self,
1090 peer: PeerId,
1091 method: ApiMethod,
1092 request: ApiRequestErased,
1093 ) -> PeerResult<Value> {
1094 trace!(target: LOG_NET_API, %method, "Api request");
1095 let res = self
1096 .connections
1097 .get(&peer)
1098 .unwrap_or_else(|| panic!("Could not find client connection for peer {peer}"))
1099 .connection()
1100 .await
1101 .context("Failed to connect to peer")
1102 .map_err(PeerError::Connection)?
1103 .request(method.clone(), request)
1104 .await;
1105
1106 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1107
1108 res
1109 }
1110}
1111
1112#[derive(Clone, Debug)]
1113struct ClientConnection {
1114 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1115}
1116
1117impl ClientConnection {
1118 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1119 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1120
1121 fedimint_core::task::spawn(
1122 "peer-api-connection",
1123 async move {
1124 let mut backoff = api_networking_backoff();
1125
1126 while let Ok(sender) = receiver.recv().await {
1127 let mut senders = vec![sender];
1128
1129 while let Ok(sender) = receiver.try_recv() {
1132 senders.push(sender);
1133 }
1134
1135 match connector.connect(peer).await {
1136 Ok(connection) => {
1137 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1138
1139 for sender in senders {
1140 sender.send(connection.clone()).ok();
1141 }
1142
1143 loop {
1144 tokio::select! {
1145 sender = receiver.recv() => {
1146 match sender.ok() {
1147 Some(sender) => sender.send(connection.clone()).ok(),
1148 None => break,
1149 };
1150 }
1151 () = connection.await_disconnection() => break,
1152 }
1153 }
1154
1155 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1156
1157 backoff = api_networking_backoff();
1158 }
1159 Err(e) => {
1160 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1161
1162 fedimint_core::task::sleep(
1163 backoff.next().expect("No limit to the number of retries"),
1164 )
1165 .await;
1166 }
1167 }
1168 }
1169
1170 info!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1171 }
1172 .instrument(trace_span!("peer-api-connection", ?peer)),
1173 );
1174
1175 ClientConnection { sender }
1176 }
1177
1178 async fn connection(&self) -> Option<DynClientConnection> {
1179 let (sender, receiver) = oneshot::channel();
1180
1181 self.sender
1182 .send(sender)
1183 .await
1184 .expect("Api connection request channel closed unexpectedly");
1185
1186 receiver.await.ok()
1187 }
1188}
1189
1190#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
1191mod iroh {
1192 use std::collections::{BTreeMap, BTreeSet};
1193
1194 use async_trait::async_trait;
1195 use bitcoin::key::rand::rngs::OsRng;
1196 use fedimint_core::module::{ApiError, ApiRequestErased};
1197 use fedimint_core::PeerId;
1198 use iroh::endpoint::Connection;
1199 use iroh::{Endpoint, NodeId, SecretKey};
1200 use serde::{Deserialize, Serialize};
1201 use serde_json::Value;
1202
1203 use super::{
1204 ApiMethod, DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult,
1205 };
1206
1207 const FEDIMINT_ALPN: &[u8] = "FEDIMINT_ALPN".as_bytes();
1208
1209 #[derive(Debug, Clone)]
1210 pub struct IrohConnector {
1211 node_ids: BTreeMap<PeerId, NodeId>,
1212 endpoint: Endpoint,
1213 }
1214
1215 impl IrohConnector {
1216 #[allow(unused)]
1217 pub async fn new(peers: BTreeMap<PeerId, NodeId>) -> anyhow::Result<Self> {
1218 Ok(Self {
1219 node_ids: peers,
1220 endpoint: Endpoint::builder()
1221 .discovery_n0()
1222 .secret_key(SecretKey::generate(&mut OsRng))
1223 .alpns(vec![FEDIMINT_ALPN.to_vec()])
1224 .bind()
1225 .await?,
1226 })
1227 }
1228 }
1229
1230 #[async_trait]
1231 impl IClientConnector for IrohConnector {
1232 fn peers(&self) -> BTreeSet<PeerId> {
1233 self.node_ids.keys().copied().collect()
1234 }
1235
1236 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1237 let node_id = *self
1238 .node_ids
1239 .get(&peer_id)
1240 .ok_or(PeerError::InvalidPeerId { peer_id })?;
1241
1242 let connection = self
1243 .endpoint
1244 .connect(node_id, FEDIMINT_ALPN)
1245 .await
1246 .map_err(PeerError::Connection)?;
1247
1248 Ok(connection.into_dyn())
1249 }
1250 }
1251
1252 #[derive(Debug, Clone, Serialize, Deserialize)]
1253 struct IrohRequest {
1254 method: ApiMethod,
1255 request: ApiRequestErased,
1256 }
1257
1258 #[async_trait]
1259 impl IClientConnection for Connection {
1260 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1261 let json = serde_json::to_vec(&IrohRequest { method, request })
1262 .expect("Serialization to vec can't fail");
1263
1264 let (mut sink, mut stream) = self
1265 .open_bi()
1266 .await
1267 .map_err(|e| PeerError::Transport(e.into()))?;
1268
1269 sink.write_all(&json)
1270 .await
1271 .map_err(|e| PeerError::Transport(e.into()))?;
1272
1273 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1274
1275 let response = stream
1276 .read_to_end(1_000_000)
1277 .await
1278 .map_err(|e| PeerError::Transport(e.into()))?;
1279
1280 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1282 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1283
1284 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1285 }
1286
1287 async fn await_disconnection(&self) {
1288 self.closed().await;
1289 }
1290 }
1291}
1292
1293#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1295pub struct FederationStatus {
1296 pub session_count: u64,
1297 pub status_by_peer: HashMap<PeerId, PeerStatus>,
1298 pub peers_online: u64,
1299 pub peers_offline: u64,
1300 pub peers_flagged: u64,
1303 pub scheduled_shutdown: Option<u64>,
1304}
1305
1306#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1307pub struct PeerStatus {
1308 pub last_contribution: Option<u64>,
1309 pub connection_status: P2PConnectionStatus,
1310 pub flagged: bool,
1313}
1314
1315#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1316#[serde(rename_all = "snake_case")]
1317pub enum P2PConnectionStatus {
1318 #[default]
1319 Disconnected,
1320 Connected,
1321}
1322
1323#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1324pub struct StatusResponse {
1325 pub server: ServerStatus,
1326 pub federation: Option<FederationStatus>,
1327}
1328
1329#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1332pub struct GuardianConfigBackup {
1333 #[serde(with = "fedimint_core::hex::serde")]
1334 pub tar_archive_bytes: Vec<u8>,
1335}
1336
1337#[cfg(test)]
1338mod tests {
1339 use std::str::FromStr as _;
1340
1341 use fedimint_core::config::FederationId;
1342 use fedimint_core::invite_code::InviteCode;
1343
1344 use super::*;
1345
1346 #[test]
1347 fn converts_invite_code() {
1348 let connect = InviteCode::new(
1349 "ws://test1".parse().unwrap(),
1350 PeerId::from(1),
1351 FederationId::dummy(),
1352 Some("api_secret".into()),
1353 );
1354
1355 let bech32 = connect.to_string();
1356 let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1357 assert_eq!(connect, connect_parsed);
1358
1359 let json = serde_json::to_string(&connect).unwrap();
1360 let connect_as_string: String = serde_json::from_str(&json).unwrap();
1361 assert_eq!(connect_as_string, bech32);
1362 let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1363 assert_eq!(connect_parsed_json, connect_parsed);
1364 }
1365
1366 #[test]
1367 fn creates_essential_guardians_invite_code() {
1368 let mut peer_to_url_map = BTreeMap::new();
1369 peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1370 peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1371 peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1372 peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1373 let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1374
1375 let code =
1376 InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1377
1378 assert_eq!(FederationId::dummy(), code.federation_id());
1379
1380 let expected_map: BTreeMap<PeerId, SafeUrl> =
1381 peer_to_url_map.into_iter().take(max_size).collect();
1382 assert_eq!(expected_map, code.peers());
1383 }
1384}