1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use anyhow::{bail, Context as _};
7use async_stream::stream;
8use fedimint_api_client::api::DynGlobalApi;
9use fedimint_core::config::ClientConfig;
10use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
11use fedimint_core::task::waiter::Waiter;
12use fedimint_core::task::{MaybeSend, MaybeSync};
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{apply, async_trait_maybe_send};
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use tokio::sync::Notify;
18use tokio_stream::{Stream, StreamExt as _};
19use tracing::{debug, instrument, warn};
20
21use crate::db::{
22 MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
23};
24use crate::Client;
25
26#[apply(async_trait_maybe_send!)]
27pub trait MetaSource: MaybeSend + MaybeSync + 'static {
28 async fn wait_for_update(&self);
30 async fn fetch(
31 &self,
32 client_config: &ClientConfig,
33 api: &DynGlobalApi,
34 fetch_kind: FetchKind,
35 last_revision: Option<u64>,
36 ) -> anyhow::Result<MetaValues>;
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum FetchKind {
41 Initial,
44 Background,
46}
47
48#[derive(Debug, Clone, Default, Serialize, Deserialize)]
49pub struct MetaValues {
50 pub values: BTreeMap<MetaFieldKey, MetaFieldValue>,
51 pub revision: u64,
52}
53
54#[derive(Debug, Clone, Copy)]
55pub struct MetaValue<T> {
56 pub fetch_time: SystemTime,
57 pub value: Option<T>,
58}
59
60pub struct MetaService<S: ?Sized = dyn MetaSource> {
63 initial_fetch_waiter: Waiter,
64 meta_update_notify: Notify,
65 source: S,
66}
67
68impl<S: MetaSource + ?Sized> MetaService<S> {
69 pub fn new(source: S) -> Arc<MetaService>
70 where
71 S: Sized,
72 {
73 Arc::new(MetaService {
75 initial_fetch_waiter: Waiter::new(),
76 meta_update_notify: Notify::new(),
77 source,
78 })
79 }
80
81 pub async fn get_field<V: DeserializeOwned + 'static>(
85 &self,
86 db: &Database,
87 field: &str,
88 ) -> Option<MetaValue<V>> {
89 if let Some(value) = self.get_field_from_db(db, field).await {
90 Some(value)
93 } else {
94 self.initial_fetch_waiter.wait().await;
96 self.get_field_from_db(db, field).await
97 }
98 }
99
100 async fn get_field_from_db<V: DeserializeOwned + 'static>(
101 &self,
102 db: &Database,
103 field: &str,
104 ) -> Option<MetaValue<V>> {
105 let dbtx = &mut db.begin_transaction_nc().await;
106 let info = dbtx.get_value(&MetaServiceInfoKey).await?;
107 let value = dbtx
108 .get_value(&MetaFieldKey(field.to_string()))
109 .await
110 .and_then(|value| parse_meta_value_static::<V>(&value.0).ok());
111 Some(MetaValue {
112 fetch_time: info.last_updated,
113 value,
114 })
115 }
116
117 async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
118 dbtx.get_value(&MetaServiceInfoKey)
119 .await
120 .map(|x| x.revision)
121 }
122
123 pub async fn wait_initialization(&self) {
126 self.initial_fetch_waiter.wait().await;
127 }
128
129 pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
132 stream! {
133 let mut notify = pin!(self.meta_update_notify.notified());
134 loop {
135 notify.as_mut().await;
136 notify.set(self.meta_update_notify.notified());
137 notify.as_mut().enable();
140 yield ();
141 }
142 }
143 }
144
145 pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
154 &'a self,
155 db: &'a Database,
156 name: &'a str,
157 ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
158 stream! {
159 let mut update_stream = pin!(self.subscribe_to_updates());
160 loop {
161 let value = self.get_field_from_db(db, name).await;
162 yield value;
163 if update_stream.next().await.is_none() {
164 break;
165 }
166 }
167 }
168 }
169
170 pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
174 let mut current_revision = self
175 .current_revision(&mut client.db().begin_transaction_nc().await)
176 .await;
177 let client_config = client.config().await;
178 let meta_values = self
179 .source
180 .fetch(
181 &client_config,
182 &client.api,
183 FetchKind::Initial,
184 current_revision,
185 )
186 .await;
187 let failed_initial = meta_values.is_err();
188 match meta_values {
189 Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
190 Err(error) => warn!(%error, "failed to fetch source"),
191 };
192 self.initial_fetch_waiter.done();
193
194 if !failed_initial {
196 self.source.wait_for_update().await;
197 }
198
199 loop {
201 if let Ok(meta_values) = self
202 .source
203 .fetch(
204 &client_config,
205 &client.api,
206 FetchKind::Background,
207 current_revision,
208 )
209 .await
210 {
211 current_revision = Some(meta_values.revision);
212 self.save_meta_values(client, &meta_values).await;
213 }
214 self.source.wait_for_update().await;
215 }
216 }
217
218 async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
219 let mut dbtx = client.db().begin_transaction().await;
220 dbtx.remove_by_prefix(&MetaFieldPrefix).await;
221 dbtx.insert_entry(
222 &MetaServiceInfoKey,
223 &MetaServiceInfo {
224 last_updated: fedimint_core::time::now(),
225 revision: meta_values.revision,
226 },
227 )
228 .await;
229 for (key, value) in &meta_values.values {
230 dbtx.insert_entry(key, value).await;
231 }
232 dbtx.commit_tx().await;
233 self.meta_update_notify.notify_waiters();
235 }
236}
237
238#[derive(Clone, Debug, Default)]
241#[non_exhaustive]
242pub struct LegacyMetaSource {
243 reqwest: reqwest::Client,
244}
245
246#[apply(async_trait_maybe_send!)]
247impl MetaSource for LegacyMetaSource {
248 async fn wait_for_update(&self) {
249 fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await;
250 }
251
252 async fn fetch(
253 &self,
254 client_config: &ClientConfig,
255 _api: &DynGlobalApi,
256 fetch_kind: FetchKind,
257 last_revision: Option<u64>,
258 ) -> anyhow::Result<MetaValues> {
259 let config_iter = client_config
260 .global
261 .meta
262 .iter()
263 .map(|(key, value)| (MetaFieldKey(key.clone()), MetaFieldValue(value.clone())));
264 let backoff = match fetch_kind {
265 FetchKind::Initial => backoff_util::aggressive_backoff(),
267 FetchKind::Background => backoff_util::background_backoff(),
268 };
269 let overrides = retry("fetch_meta_overrides", backoff, || {
270 fetch_meta_overrides(&self.reqwest, client_config, "meta_override_url")
271 })
272 .await?;
273 Ok(MetaValues {
274 values: config_iter.chain(overrides).collect(),
275 revision: last_revision.map_or(0, |r| r + 1),
276 })
277 }
278}
279
280pub async fn fetch_meta_overrides(
281 reqwest: &reqwest::Client,
282 client_config: &ClientConfig,
283 field_name: &str,
284) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> {
285 let Some(url) = client_config.meta::<String>(field_name)? else {
286 return Ok(BTreeMap::new());
287 };
288 let response = reqwest
289 .get(&url)
290 .send()
291 .await
292 .context("Meta override source could not be fetched")?;
293
294 debug!("Meta override source returned status: {response:?}");
295
296 if response.status() != reqwest::StatusCode::OK {
297 bail!(
298 "Meta override request returned non-OK status code: {}",
299 response.status()
300 );
301 }
302
303 let mut federation_map = response
304 .json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>()
305 .await
306 .context("Meta override could not be parsed as JSON")?;
307
308 let federation_id = client_config.calculate_federation_id().to_string();
309 let meta_fields = federation_map
310 .remove(&federation_id)
311 .with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))?
312 .into_iter()
313 .filter_map(|(key, value)| {
314 if let serde_json::Value::String(value_str) = value {
315 Some((MetaFieldKey(key), MetaFieldValue(value_str)))
316 } else {
317 warn!("Meta override map contained non-string key: {key}, ignoring");
318 None
319 }
320 })
321 .collect::<BTreeMap<_, _>>();
322
323 Ok(meta_fields)
324}
325
326#[instrument(err)] pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
331 str_value: &str,
332) -> anyhow::Result<V> {
333 let res = serde_json::from_str(str_value)
334 .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
335
336 if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
340 let string_ret = Box::new(str_value.to_owned());
341 let ret: Box<V> = unsafe {
342 std::mem::transmute(string_ret)
344 };
345 Ok(*ret)
346 } else {
347 res
348 }
349}