use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::{self, Debug, Display};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, result};
use anyhow::anyhow;
use base64::Engine as _;
use bitcoin::hashes::sha256;
use bitcoin::secp256k1;
use fedimint_core::admin_client::{
ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
ServerStatus,
};
use fedimint_core::backup::ClientBackupSnapshot;
use fedimint_core::core::backup::SignedBackupRequest;
use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::endpoint_constants::AWAIT_OUTPUT_OUTCOME_ENDPOINT;
use fedimint_core::fmt_utils::{AbbreviateDebug, AbbreviateJson};
use fedimint_core::invite_code::InviteCode;
use fedimint_core::module::audit::AuditSummary;
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::module::{ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding};
use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
use fedimint_core::util::SafeUrl;
use fedimint_core::{
apply, async_trait_maybe_send, dyn_newtype_define, runtime, NumPeersExt, OutPoint, PeerId,
TransactionId,
};
use fedimint_logging::LOG_CLIENT_NET_API;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use itertools::Itertools;
use jsonrpsee_core::client::{ClientT, Error as JsonRpcClientError};
use jsonrpsee_core::DeserializeOwned;
#[cfg(target_family = "wasm")]
use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
#[cfg(not(target_family = "wasm"))]
use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
#[cfg(not(target_family = "wasm"))]
use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
#[cfg(not(target_family = "wasm"))]
use tokio_rustls::rustls::RootCertStore;
use tracing::{debug, error, instrument, trace, warn};
use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
mod federation_peer_client;
mod global_federation_api_with_cache;
use federation_peer_client::FederationPeer;
use fedimint_core::net::api_announcement::SignedApiAnnouncement;
use global_federation_api_with_cache::GlobalFederationApiWithCache;
pub type PeerResult<T> = Result<T, PeerError>;
pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
pub type FederationResult<T> = Result<T, FederationError>;
pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
#[derive(Debug, Error)]
pub enum PeerError {
#[error("Response deserialization error: {0}")]
ResponseDeserialization(anyhow::Error),
#[error("Invalid peer id: {peer_id}")]
InvalidPeerId { peer_id: PeerId },
#[error("Rpc error: {0}")]
Rpc(#[from] JsonRpcClientError),
#[error("Invalid response: {0}")]
InvalidResponse(String),
}
impl PeerError {
pub fn report_if_important(&self, peer_id: PeerId) {
let important = match self {
PeerError::ResponseDeserialization(_)
| PeerError::InvalidPeerId { .. }
| PeerError::InvalidResponse(_) => true,
PeerError::Rpc(rpc_e) => match rpc_e {
JsonRpcClientError::Transport(_) | JsonRpcClientError::RequestTimeout => false,
JsonRpcClientError::RestartNeeded(_)
| JsonRpcClientError::Call(_)
| JsonRpcClientError::ParseError(_)
| JsonRpcClientError::InvalidSubscriptionId
| JsonRpcClientError::InvalidRequestId(_)
| JsonRpcClientError::Custom(_)
| JsonRpcClientError::HttpNotImplemented
| JsonRpcClientError::EmptyBatchRequest(_)
| JsonRpcClientError::RegisterMethod(_) => true,
},
};
trace!(target: LOG_CLIENT_NET_API, error = %self, "PeerError");
if important {
warn!(target: LOG_CLIENT_NET_API, error = %self, %peer_id, "Unusual PeerError");
}
}
}
#[derive(Debug, Error)]
pub struct FederationError {
method: String,
params: serde_json::Value,
general: Option<anyhow::Error>,
peers: BTreeMap<PeerId, PeerError>,
}
impl Display for FederationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Federation rpc error {")?;
if let Some(general) = self.general.as_ref() {
f.write_fmt(format_args!("method => {}), ", self.method))?;
f.write_fmt(format_args!(
"params => {:?}), ",
AbbreviateJson(&self.params)
))?;
f.write_fmt(format_args!("general => {general})"))?;
if !self.peers.is_empty() {
f.write_str(", ")?;
}
}
for (i, (peer, e)) in self.peers.iter().enumerate() {
f.write_fmt(format_args!("{peer} => {e})"))?;
if i == self.peers.len() - 1 {
f.write_str(", ")?;
}
}
f.write_str("}")?;
Ok(())
}
}
impl FederationError {
pub fn general(
method: impl Into<String>,
params: impl Serialize,
e: impl Into<anyhow::Error>,
) -> FederationError {
FederationError {
method: method.into(),
params: serde_json::to_value(params).unwrap_or_default(),
general: Some(e.into()),
peers: BTreeMap::default(),
}
}
pub fn new_one_peer(
peer_id: PeerId,
method: impl Into<String>,
params: impl Serialize,
error: PeerError,
) -> Self {
Self {
method: method.into(),
params: serde_json::to_value(params).expect("Serialization of valid params won't fail"),
general: None,
peers: [(peer_id, error)].into_iter().collect(),
}
}
pub fn report_if_important(&self) {
if let Some(error) = self.general.as_ref() {
warn!(target: LOG_CLIENT_NET_API, %error, "General FederationError");
}
for (peer_id, e) in &self.peers {
e.report_if_important(*peer_id);
}
}
pub fn get_general_error(&self) -> Option<&anyhow::Error> {
self.general.as_ref()
}
pub fn get_peer_errors(&self) -> impl Iterator<Item = (PeerId, &PeerError)> {
self.peers.iter().map(|(peer, error)| (*peer, error))
}
}
type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
#[derive(Debug, Error)]
pub enum OutputOutcomeError {
#[error("Response deserialization error: {0}")]
ResponseDeserialization(anyhow::Error),
#[error("Federation error: {0}")]
Federation(#[from] FederationError),
#[error("Core error: {0}")]
Core(#[from] anyhow::Error),
#[error("Transaction rejected: {0}")]
Rejected(String),
#[error("Invalid output index {out_idx}, larger than {outputs_num} in the transaction")]
InvalidVout { out_idx: u64, outputs_num: usize },
#[error("Timeout reached after waiting {}s", .0.as_secs())]
Timeout(Duration),
}
impl OutputOutcomeError {
pub fn report_if_important(&self) {
let important = match self {
OutputOutcomeError::Federation(e) => {
e.report_if_important();
return;
}
OutputOutcomeError::Core(_)
| OutputOutcomeError::InvalidVout { .. }
| OutputOutcomeError::ResponseDeserialization(_) => true,
OutputOutcomeError::Rejected(_) | OutputOutcomeError::Timeout(_) => false,
};
trace!(target: LOG_CLIENT_NET_API, error = %self, "OutputOutcomeError");
if important {
warn!(target: LOG_CLIENT_NET_API, error = %self, "Uncommon OutputOutcomeError");
}
}
pub fn is_rejected(&self) -> bool {
matches!(
self,
OutputOutcomeError::Rejected(_) | OutputOutcomeError::InvalidVout { .. }
)
}
}
#[apply(async_trait_maybe_send!)]
pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
fn all_peers(&self) -> &BTreeSet<PeerId>;
fn self_peer(&self) -> Option<PeerId>;
fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
async fn request_raw(
&self,
peer_id: PeerId,
method: &str,
params: &[Value],
) -> result::Result<Value, JsonRpcClientError>;
}
#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
pub struct ApiVersionSet {
pub core: ApiVersion,
pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
}
#[apply(async_trait_maybe_send!)]
pub trait FederationApiExt: IRawFederationApi {
async fn request_single_peer(
&self,
timeout: Option<Duration>,
method: String,
params: ApiRequestErased,
peer_id: PeerId,
) -> JsonRpcResult<jsonrpsee_core::JsonValue> {
let request = async {
self.request_raw(peer_id, &method, &[params.to_json()])
.await
};
if let Some(timeout) = timeout {
match fedimint_core::runtime::timeout(timeout, request).await {
Ok(result) => result,
Err(_timeout) => Err(JsonRpcClientError::RequestTimeout),
}
} else {
request.await
}
}
async fn request_single_peer_typed<Ret>(
&self,
timeout: Option<Duration>,
method: String,
params: ApiRequestErased,
peer_id: PeerId,
) -> PeerResult<Ret>
where
Ret: DeserializeOwned,
{
self.request_single_peer(timeout, method, params, peer_id)
.await
.map_err(PeerError::Rpc)
.and_then(|v| {
serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
})
}
async fn request_single_peer_federation<FedRet>(
&self,
timeout: Option<Duration>,
method: String,
params: ApiRequestErased,
peer_id: PeerId,
) -> FederationResult<FedRet>
where
FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
Ok(self
.request_single_peer(timeout, method.clone(), params.clone(), peer_id)
.await
.map_err(PeerError::Rpc)
.and_then(|v| {
serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
})
.map_err(|e| FederationError::new_one_peer(peer_id, method, params, e))?)
}
async fn request_with_strategy<PeerRet: serde::de::DeserializeOwned, FedRet: Debug>(
&self,
mut strategy: impl QueryStrategy<PeerRet, FedRet> + MaybeSend,
method: String,
params: ApiRequestErased,
) -> FederationResult<FedRet> {
#[cfg(not(target_family = "wasm"))]
let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
#[cfg(target_family = "wasm")]
let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
let peers = self.all_peers();
for peer_id in peers {
futures.push(Box::pin(async {
let request = async {
self.request_raw(*peer_id, &method, &[params.to_json()])
.await
.map(AbbreviateDebug)
};
PeerResponse {
peer: *peer_id,
result: request.await,
}
}));
}
let mut peer_delay_ms = BTreeMap::new();
let max_delay_ms = 1000;
loop {
let response = futures.next().await;
trace!(target: LOG_CLIENT_NET_API, ?response, method, params = ?AbbreviateDebug(params.to_json()), "Received peer response");
match response {
Some(PeerResponse { peer, result }) => {
let result: PeerResult<PeerRet> =
result.map_err(PeerError::Rpc).and_then(|o| {
serde_json::from_value::<PeerRet>(o.0)
.map_err(|e| PeerError::ResponseDeserialization(e.into()))
});
let strategy_step = strategy.process(peer, result);
trace!(
target: LOG_CLIENT_NET_API,
method,
?params,
?strategy_step,
"Taking strategy step to the response after peer response"
);
match strategy_step {
QueryStep::Retry(peers) => {
for retry_peer in peers {
let mut delay_ms =
peer_delay_ms.get(&retry_peer).copied().unwrap_or(10);
delay_ms = cmp::min(max_delay_ms, delay_ms * 2);
peer_delay_ms.insert(retry_peer, delay_ms);
futures.push(Box::pin({
let method = &method;
let params = ¶ms;
async move {
runtime::sleep(Duration::from_millis(delay_ms)).await;
PeerResponse {
peer: retry_peer,
result: self
.request_raw(
retry_peer,
method,
&[params.to_json()],
)
.await
.map(AbbreviateDebug),
}
}
}));
}
}
QueryStep::Continue => {}
QueryStep::Failure { general, peers } => {
return Err(FederationError {
method: method.clone(),
params: params.params.clone(),
general,
peers,
})
}
QueryStep::Success(response) => return Ok(response),
}
}
None => {
panic!("Query strategy ran out of peers to query without returning a result");
}
}
}
}
async fn request_current_consensus<Ret>(
&self,
method: String,
params: ApiRequestErased,
) -> FederationResult<Ret>
where
Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
self.request_with_strategy(
ThresholdConsensus::new(self.all_peers().to_num_peers()),
method,
params,
)
.await
}
async fn request_admin<Ret>(
&self,
method: &str,
params: ApiRequestErased,
auth: ApiAuth,
) -> FederationResult<Ret>
where
Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
let Some(self_peer_id) = self.self_peer() else {
return Err(FederationError::general(
method,
params,
anyhow::format_err!("Admin peer_id not set"),
));
};
self.request_single_peer_federation(
None,
method.into(),
params.with_auth(auth),
self_peer_id,
)
.await
}
async fn request_admin_no_auth<Ret>(
&self,
method: &str,
params: ApiRequestErased,
) -> FederationResult<Ret>
where
Ret: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
let Some(self_peer_id) = self.self_peer() else {
return Err(FederationError::general(
method,
params,
anyhow::format_err!("Admin peer_id not set"),
));
};
self.request_single_peer_federation(None, method.into(), params, self_peer_id)
.await
}
}
#[apply(async_trait_maybe_send!)]
impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
pub trait IModuleFederationApi: IRawFederationApi {}
dyn_newtype_define! {
#[derive(Clone)]
pub DynModuleApi(Arc<IModuleFederationApi>)
}
dyn_newtype_define! {
#[derive(Clone)]
pub DynGlobalApi(Arc<IGlobalFederationApi>)
}
impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
self.inner.as_ref()
}
}
impl DynGlobalApi {
pub fn new_admin(peer: PeerId, url: SafeUrl, api_secret: &Option<String>) -> DynGlobalApi {
GlobalFederationApiWithCache::new(
WsFederationApi::new(vec![(peer, url)], api_secret).with_self_peer_id(peer),
)
.into()
}
pub fn from_pre_peer_id_admin_endpoint(url: SafeUrl, api_secret: &Option<String>) -> Self {
let peer_id = PeerId::from(1024);
GlobalFederationApiWithCache::new(
WsFederationApi::new(vec![(peer_id, url)], api_secret).with_self_peer_id(peer_id),
)
.into()
}
pub fn from_single_endpoint(peer: PeerId, url: SafeUrl, api_secret: &Option<String>) -> Self {
GlobalFederationApiWithCache::new(WsFederationApi::new(vec![(peer, url)], api_secret))
.into()
}
pub fn from_endpoints(
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
api_secret: &Option<String>,
) -> Self {
GlobalFederationApiWithCache::new(WsFederationApi::new(peers, api_secret)).into()
}
pub fn from_invite_code(invite_code: &InviteCode) -> Self {
GlobalFederationApiWithCache::new(WsFederationApi::new(
invite_code.peers().into_iter().collect_vec(),
&invite_code.api_secret(),
))
.into()
}
pub async fn await_output_outcome<R>(
&self,
outpoint: OutPoint,
timeout: Duration,
module_decoder: &Decoder,
) -> OutputOutcomeResult<R>
where
R: OutputOutcome,
{
fedimint_core::runtime::timeout(timeout, async {
let outcome: SerdeOutputOutcome = self
.inner
.request_current_consensus(
AWAIT_OUTPUT_OUTCOME_ENDPOINT.to_owned(),
ApiRequestErased::new(outpoint),
)
.await
.map_err(OutputOutcomeError::Federation)?;
deserialize_outcome(&outcome, module_decoder)
})
.await
.map_err(|_| OutputOutcomeError::Timeout(timeout))?
}
}
#[apply(async_trait_maybe_send!)]
pub trait IGlobalFederationApi: IRawFederationApi {
async fn submit_transaction(
&self,
tx: Transaction,
) -> FederationResult<SerdeModuleEncoding<TransactionSubmissionOutcome>>;
async fn await_block(
&self,
block_index: u64,
decoders: &ModuleDecoderRegistry,
) -> anyhow::Result<SessionOutcome>;
async fn get_session_status(
&self,
block_index: u64,
decoders: &ModuleDecoderRegistry,
) -> anyhow::Result<SessionStatus>;
async fn session_count(&self) -> FederationResult<u64>;
async fn await_transaction(&self, txid: TransactionId) -> FederationResult<TransactionId>;
async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash>;
async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
async fn download_backup(
&self,
id: &secp256k1::PublicKey,
) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
async fn set_config_gen_connections(
&self,
info: ConfigGenConnectionsRequest,
auth: ApiAuth,
) -> FederationResult<()>;
async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
async fn get_default_config_gen_params(
&self,
auth: ApiAuth,
) -> FederationResult<ConfigGenParamsRequest>;
async fn set_config_gen_params(
&self,
requested: ConfigGenParamsRequest,
auth: ApiAuth,
) -> FederationResult<()>;
async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse>;
async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
async fn get_verify_config_hash(
&self,
auth: ApiAuth,
) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
async fn verified_configs(
&self,
auth: ApiAuth,
) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
async fn status(&self) -> FederationResult<StatusResponse>;
async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
async fn guardian_config_backup(&self, auth: ApiAuth)
-> FederationResult<GuardianConfigBackup>;
async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
async fn submit_api_announcement(
&self,
peer_id: PeerId,
announcement: SignedApiAnnouncement,
) -> FederationResult<()>;
async fn api_announcements(
&self,
guardian: PeerId,
) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
async fn sign_api_announcement(
&self,
api_url: SafeUrl,
auth: ApiAuth,
) -> FederationResult<SignedApiAnnouncement>;
async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
}
pub fn deserialize_outcome<R>(
outcome: &SerdeOutputOutcome,
module_decoder: &Decoder,
) -> OutputOutcomeResult<R>
where
R: OutputOutcome + MaybeSend,
{
let dyn_outcome = outcome
.try_into_inner_known_module_kind(module_decoder)
.map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
let source_instance = dyn_outcome.module_instance_id();
dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
let target_type = std::any::type_name::<R>();
OutputOutcomeError::ResponseDeserialization(anyhow!(
"Could not downcast output outcome with instance id {source_instance} to {target_type}"
))
})
}
#[derive(Debug, Clone)]
pub struct WsFederationApi<C = WsClient> {
peer_ids: BTreeSet<PeerId>,
self_peer_id: Option<PeerId>,
peers: Arc<Vec<FederationPeer<C>>>,
module_id: Option<ModuleInstanceId>,
}
impl<C: JsonRpcClient + Debug + 'static> IModuleFederationApi for WsFederationApi<C> {}
#[apply(async_trait_maybe_send!)]
impl<C: JsonRpcClient + Debug + 'static> IRawFederationApi for WsFederationApi<C> {
fn all_peers(&self) -> &BTreeSet<PeerId> {
&self.peer_ids
}
fn self_peer(&self) -> Option<PeerId> {
self.self_peer_id
}
fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
WsFederationApi {
peer_ids: self.peer_ids.clone(),
peers: self.peers.clone(),
module_id: Some(id),
self_peer_id: self.self_peer_id,
}
.into()
}
async fn request_raw(
&self,
peer_id: PeerId,
method: &str,
params: &[Value],
) -> JsonRpcResult<Value> {
let peer = self
.peers
.iter()
.find(|m| m.peer_id == peer_id)
.ok_or_else(|| JsonRpcClientError::Custom(format!("Invalid peer_id: {peer_id}")))?;
let method = match self.module_id {
None => method.to_string(),
Some(id) => format!("module_{id}_{method}"),
};
peer.request(&method, params).await
}
}
#[apply(async_trait_maybe_send!)]
pub trait JsonRpcClient: ClientT + Sized + MaybeSend + MaybeSync {
async fn connect(
url: &SafeUrl,
api_secret: Option<String>,
) -> result::Result<Self, JsonRpcClientError>;
fn is_connected(&self) -> bool;
}
#[apply(async_trait_maybe_send!)]
impl JsonRpcClient for WsClient {
async fn connect(
url: &SafeUrl,
api_secret: Option<String>,
) -> result::Result<Self, JsonRpcClientError> {
#[cfg(not(target_family = "wasm"))]
let mut client = {
let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
let mut root_certs = RootCertStore::empty();
root_certs.extend(webpki_roots);
let tls_cfg = CustomCertStore::builder()
.with_root_certificates(root_certs)
.with_no_client_auth();
WsClientBuilder::default()
.max_concurrent_requests(u16::MAX as usize)
.with_custom_cert_store(tls_cfg)
};
#[cfg(target_family = "wasm")]
let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
if let Some(api_secret) = api_secret {
#[cfg(not(target_family = "wasm"))]
{
let mut headers = HeaderMap::new();
let auth = base64::engine::general_purpose::STANDARD
.encode(format!("fedimint:{api_secret}"));
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
);
client = client.set_headers(headers);
}
#[cfg(target_family = "wasm")]
{
let mut url = url.clone();
url.set_username("fedimint").map_err(|_| {
JsonRpcClientError::Transport(anyhow::format_err!("invalid username").into())
})?;
url.set_password(Some(&api_secret)).map_err(|_| {
JsonRpcClientError::Transport(anyhow::format_err!("invalid secret").into())
})?;
return client.build(url.as_str()).await;
}
}
client.build(url.as_str()).await
}
fn is_connected(&self) -> bool {
self.is_connected()
}
}
impl WsFederationApi<WsClient> {
pub fn new(
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
api_secret: &Option<String>,
) -> Self {
Self::new_with_client(peers, None, api_secret)
}
pub fn with_self_peer_id(self, self_peer_id: PeerId) -> Self {
Self {
self_peer_id: Some(self_peer_id),
..self
}
}
}
impl<C> WsFederationApi<C>
where
C: JsonRpcClient + 'static,
{
pub fn peers(&self) -> Vec<PeerId> {
self.peers.iter().map(|peer| peer.peer_id).collect()
}
pub fn new_with_client(
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
self_peer_id: Option<PeerId>,
api_secret: &Option<String>,
) -> Self {
let (peer_connections, peer_ids) = peers
.into_iter()
.map(|(peer_id, url)| {
assert!(
url.port_or_known_default().is_some(),
"API client requires a port"
);
assert!(url.host().is_some(), "API client requires a target host");
(
FederationPeer::new(url, peer_id, api_secret.clone()),
peer_id,
)
})
.unzip();
WsFederationApi {
peer_ids,
self_peer_id,
peers: Arc::new(peer_connections),
module_id: None,
}
}
}
#[derive(Debug)]
pub struct PeerResponse<R> {
pub peer: PeerId,
pub result: JsonRpcResult<R>,
}
impl<C> FederationPeer<C>
where
C: JsonRpcClient + 'static,
{
#[instrument(level = "trace", fields(peer = %self.peer_id, %method), skip_all)]
pub async fn request(&self, method: &str, params: &[Value]) -> JsonRpcResult<Value> {
for attempts in 0.. {
debug_assert!(attempts <= 1);
let rclient = self.client.read().await;
match rclient.client.get_try().await {
Ok(client) if client.is_connected() => {
return client.request::<_, _>(method, params).await;
}
Err(e) => {
if 0 < attempts {
return Err(JsonRpcClientError::Transport(e.into()));
}
debug!(target: LOG_CLIENT_NET_API, err=%e, "Triggering reconnection after connection error");
}
Ok(_client) => {
if 0 < attempts {
return Err(JsonRpcClientError::Transport(
anyhow::format_err!("Disconnected").into(),
));
}
debug!(target: LOG_CLIENT_NET_API, "Triggering reconnection after disconnection");
}
};
drop(rclient);
let mut wclient = self.client.write().await;
match wclient.client.get_try().await {
Ok(client) if client.is_connected() => {
trace!(target: LOG_CLIENT_NET_API, "Some other request reconnected client, retrying");
}
_ => {
wclient.reconnect(self.peer_id, self.url.clone(), self.api_secret.clone());
}
}
}
unreachable!();
}
}
impl<C: JsonRpcClient> WsFederationApi<C> {}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct FederationStatus {
pub session_count: u64,
pub status_by_peer: HashMap<PeerId, PeerStatus>,
pub peers_online: u64,
pub peers_offline: u64,
pub peers_flagged: u64,
pub scheduled_shutdown: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerStatus {
pub last_contribution: Option<u64>,
pub connection_status: PeerConnectionStatus,
pub flagged: bool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerConnectionStatus {
#[default]
Disconnected,
Connected,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct StatusResponse {
pub server: ServerStatus,
pub federation: Option<FederationStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GuardianConfigBackup {
#[serde(with = "fedimint_core::hex::serde")]
pub tar_archive_bytes: Vec<u8>,
}
#[cfg(test)]
mod tests {
use std::str::FromStr as _;
use fedimint_core::config::FederationId;
use jsonrpsee_core::client::BatchResponse;
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use super::*;
type Result<T = ()> = std::result::Result<T, JsonRpcClientError>;
#[apply(async_trait_maybe_send!)]
trait SimpleClient: Sized {
async fn connect() -> Result<Self>;
fn is_connected(&self) -> bool {
true
}
async fn request(&self, method: &str) -> Result<String>;
}
struct Client<C: SimpleClient>(C);
#[apply(async_trait_maybe_send!)]
impl<C: SimpleClient + MaybeSend + MaybeSync> JsonRpcClient for Client<C> {
fn is_connected(&self) -> bool {
self.0.is_connected()
}
async fn connect(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
Ok(Self(C::connect().await?))
}
}
#[apply(async_trait_maybe_send!)]
impl<C: SimpleClient + MaybeSend + MaybeSync> ClientT for Client<C> {
async fn request<R, P>(&self, method: &str, _params: P) -> Result<R>
where
R: jsonrpsee_core::DeserializeOwned,
P: ToRpcParams + MaybeSend,
{
let json = self.0.request(method).await?;
Ok(serde_json::from_str(&json).unwrap())
}
async fn notification<P>(&self, _method: &str, _params: P) -> Result<()>
where
P: ToRpcParams + MaybeSend,
{
unimplemented!()
}
async fn batch_request<'a, R>(
&self,
_batch: BatchRequestBuilder<'a>,
) -> std::result::Result<BatchResponse<'a, R>, jsonrpsee_core::client::Error>
where
R: DeserializeOwned + fmt::Debug + 'a,
{
unimplemented!()
}
}
#[test]
fn converts_invite_code() {
let connect = InviteCode::new(
"ws://test1".parse().unwrap(),
PeerId::from(1),
FederationId::dummy(),
Some("api_secret".into()),
);
let bech32 = connect.to_string();
let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
assert_eq!(connect, connect_parsed);
let json = serde_json::to_string(&connect).unwrap();
let connect_as_string: String = serde_json::from_str(&json).unwrap();
assert_eq!(connect_as_string, bech32);
let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
assert_eq!(connect_parsed_json, connect_parsed);
}
#[test]
fn creates_essential_guardians_invite_code() {
let mut peer_to_url_map = BTreeMap::new();
peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
let code =
InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
assert_eq!(FederationId::dummy(), code.federation_id());
let expected_map: BTreeMap<PeerId, SafeUrl> =
peer_to_url_map.into_iter().take(max_size).collect();
assert_eq!(expected_map, code.peers());
}
}