1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use anyhow::{bail, Context as _};
7use async_stream::stream;
8use fedimint_api_client::api::DynGlobalApi;
9use fedimint_core::config::ClientConfig;
10use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
11use fedimint_core::task::waiter::Waiter;
12use fedimint_core::task::{MaybeSend, MaybeSync};
13use fedimint_core::util::{backoff_util, retry, FmtCompactAnyhow as _};
14use fedimint_core::{apply, async_trait_maybe_send};
15use fedimint_logging::LOG_CLIENT;
16use serde::de::DeserializeOwned;
17use serde::{Deserialize, Serialize};
18use tokio::sync::Notify;
19use tokio_stream::{Stream, StreamExt as _};
20use tracing::{debug, instrument, warn};
21
22use crate::db::{
23 MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
24};
25use crate::Client;
26
27#[apply(async_trait_maybe_send!)]
28pub trait MetaSource: MaybeSend + MaybeSync + 'static {
29 async fn wait_for_update(&self);
31 async fn fetch(
32 &self,
33 client_config: &ClientConfig,
34 api: &DynGlobalApi,
35 fetch_kind: FetchKind,
36 last_revision: Option<u64>,
37 ) -> anyhow::Result<MetaValues>;
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum FetchKind {
42 Initial,
45 Background,
47}
48
49#[derive(Debug, Clone, Default, Serialize, Deserialize)]
50pub struct MetaValues {
51 pub values: BTreeMap<MetaFieldKey, MetaFieldValue>,
52 pub revision: u64,
53}
54
55#[derive(Debug, Clone, Copy)]
56pub struct MetaValue<T> {
57 pub fetch_time: SystemTime,
58 pub value: Option<T>,
59}
60
61pub struct MetaService<S: ?Sized = dyn MetaSource> {
64 initial_fetch_waiter: Waiter,
65 meta_update_notify: Notify,
66 source: S,
67}
68
69impl<S: MetaSource + ?Sized> MetaService<S> {
70 pub fn new(source: S) -> Arc<MetaService>
71 where
72 S: Sized,
73 {
74 Arc::new(MetaService {
76 initial_fetch_waiter: Waiter::new(),
77 meta_update_notify: Notify::new(),
78 source,
79 })
80 }
81
82 pub async fn get_field<V: DeserializeOwned + 'static>(
86 &self,
87 db: &Database,
88 field: &str,
89 ) -> Option<MetaValue<V>> {
90 if let Some(value) = self.get_field_from_db(db, field).await {
91 Some(value)
94 } else {
95 self.initial_fetch_waiter.wait().await;
97 self.get_field_from_db(db, field).await
98 }
99 }
100
101 async fn get_field_from_db<V: DeserializeOwned + 'static>(
102 &self,
103 db: &Database,
104 field: &str,
105 ) -> Option<MetaValue<V>> {
106 let dbtx = &mut db.begin_transaction_nc().await;
107 let info = dbtx.get_value(&MetaServiceInfoKey).await?;
108 let value = dbtx
109 .get_value(&MetaFieldKey(field.to_string()))
110 .await
111 .and_then(|value| parse_meta_value_static::<V>(&value.0).ok());
112 Some(MetaValue {
113 fetch_time: info.last_updated,
114 value,
115 })
116 }
117
118 async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
119 dbtx.get_value(&MetaServiceInfoKey)
120 .await
121 .map(|x| x.revision)
122 }
123
124 pub async fn wait_initialization(&self) {
127 self.initial_fetch_waiter.wait().await;
128 }
129
130 pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
133 stream! {
134 let mut notify = pin!(self.meta_update_notify.notified());
135 loop {
136 notify.as_mut().await;
137 notify.set(self.meta_update_notify.notified());
138 notify.as_mut().enable();
141 yield ();
142 }
143 }
144 }
145
146 pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
155 &'a self,
156 db: &'a Database,
157 name: &'a str,
158 ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
159 stream! {
160 let mut update_stream = pin!(self.subscribe_to_updates());
161 loop {
162 let value = self.get_field_from_db(db, name).await;
163 yield value;
164 if update_stream.next().await.is_none() {
165 break;
166 }
167 }
168 }
169 }
170
171 pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
175 let mut current_revision = self
176 .current_revision(&mut client.db().begin_transaction_nc().await)
177 .await;
178 let client_config = client.config().await;
179 let meta_values = self
180 .source
181 .fetch(
182 &client_config,
183 &client.api,
184 FetchKind::Initial,
185 current_revision,
186 )
187 .await;
188 let failed_initial = meta_values.is_err();
189 match meta_values {
190 Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
191 Err(error) => {
192 warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
193 }
194 };
195 self.initial_fetch_waiter.done();
196
197 if !failed_initial {
199 self.source.wait_for_update().await;
200 }
201
202 loop {
204 if let Ok(meta_values) = self
205 .source
206 .fetch(
207 &client_config,
208 &client.api,
209 FetchKind::Background,
210 current_revision,
211 )
212 .await
213 {
214 current_revision = Some(meta_values.revision);
215 self.save_meta_values(client, &meta_values).await;
216 }
217 self.source.wait_for_update().await;
218 }
219 }
220
221 async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
222 let mut dbtx = client.db().begin_transaction().await;
223 dbtx.remove_by_prefix(&MetaFieldPrefix).await;
224 dbtx.insert_entry(
225 &MetaServiceInfoKey,
226 &MetaServiceInfo {
227 last_updated: fedimint_core::time::now(),
228 revision: meta_values.revision,
229 },
230 )
231 .await;
232 for (key, value) in &meta_values.values {
233 dbtx.insert_entry(key, value).await;
234 }
235 dbtx.commit_tx().await;
236 self.meta_update_notify.notify_waiters();
238 }
239}
240
241#[derive(Clone, Debug, Default)]
244#[non_exhaustive]
245pub struct LegacyMetaSource {
246 reqwest: reqwest::Client,
247}
248
249#[apply(async_trait_maybe_send!)]
250impl MetaSource for LegacyMetaSource {
251 async fn wait_for_update(&self) {
252 fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await;
253 }
254
255 async fn fetch(
256 &self,
257 client_config: &ClientConfig,
258 _api: &DynGlobalApi,
259 fetch_kind: FetchKind,
260 last_revision: Option<u64>,
261 ) -> anyhow::Result<MetaValues> {
262 let config_iter = client_config
263 .global
264 .meta
265 .iter()
266 .map(|(key, value)| (MetaFieldKey(key.clone()), MetaFieldValue(value.clone())));
267 let backoff = match fetch_kind {
268 FetchKind::Initial => backoff_util::aggressive_backoff(),
270 FetchKind::Background => backoff_util::background_backoff(),
271 };
272 let overrides = retry("fetch_meta_overrides", backoff, || {
273 fetch_meta_overrides(&self.reqwest, client_config, "meta_override_url")
274 })
275 .await?;
276 Ok(MetaValues {
277 values: config_iter.chain(overrides).collect(),
278 revision: last_revision.map_or(0, |r| r + 1),
279 })
280 }
281}
282
283pub async fn fetch_meta_overrides(
284 reqwest: &reqwest::Client,
285 client_config: &ClientConfig,
286 field_name: &str,
287) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> {
288 let Some(url) = client_config.meta::<String>(field_name)? else {
289 return Ok(BTreeMap::new());
290 };
291 let response = reqwest
292 .get(&url)
293 .send()
294 .await
295 .context("Meta override source could not be fetched")?;
296
297 debug!("Meta override source returned status: {response:?}");
298
299 if response.status() != reqwest::StatusCode::OK {
300 bail!(
301 "Meta override request returned non-OK status code: {}",
302 response.status()
303 );
304 }
305
306 let mut federation_map = response
307 .json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>()
308 .await
309 .context("Meta override could not be parsed as JSON")?;
310
311 let federation_id = client_config.calculate_federation_id().to_string();
312 let meta_fields = federation_map
313 .remove(&federation_id)
314 .with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))?
315 .into_iter()
316 .filter_map(|(key, value)| {
317 if let serde_json::Value::String(value_str) = value {
318 Some((MetaFieldKey(key), MetaFieldValue(value_str)))
319 } else {
320 warn!(target: LOG_CLIENT, "Meta override map contained non-string key: {key}, ignoring");
321 None
322 }
323 })
324 .collect::<BTreeMap<_, _>>();
325
326 Ok(meta_fields)
327}
328
329#[instrument(target = LOG_CLIENT, err)] pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
334 str_value: &str,
335) -> anyhow::Result<V> {
336 let res = serde_json::from_str(str_value)
337 .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
338
339 if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
343 let string_ret = Box::new(str_value.to_owned());
344 let ret: Box<V> = unsafe {
345 std::mem::transmute(string_ret)
347 };
348 Ok(*ret)
349 } else {
350 res
351 }
352}