fedimint_client/
meta.rs

1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use anyhow::{bail, Context as _};
7use async_stream::stream;
8use fedimint_api_client::api::DynGlobalApi;
9use fedimint_core::config::ClientConfig;
10use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
11use fedimint_core::task::waiter::Waiter;
12use fedimint_core::task::{MaybeSend, MaybeSync};
13use fedimint_core::util::{backoff_util, retry, FmtCompactAnyhow as _};
14use fedimint_core::{apply, async_trait_maybe_send};
15use fedimint_logging::LOG_CLIENT;
16use serde::de::DeserializeOwned;
17use serde::{Deserialize, Serialize};
18use tokio::sync::Notify;
19use tokio_stream::{Stream, StreamExt as _};
20use tracing::{debug, instrument, warn};
21
22use crate::db::{
23    MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
24};
25use crate::Client;
26
27#[apply(async_trait_maybe_send!)]
28pub trait MetaSource: MaybeSend + MaybeSync + 'static {
29    /// Wait for next change in this source.
30    async fn wait_for_update(&self);
31    async fn fetch(
32        &self,
33        client_config: &ClientConfig,
34        api: &DynGlobalApi,
35        fetch_kind: FetchKind,
36        last_revision: Option<u64>,
37    ) -> anyhow::Result<MetaValues>;
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum FetchKind {
42    /// Meta source should return fast, retry less.
43    /// This blocks getting any meta values.
44    Initial,
45    /// Meta source can retry infinitely.
46    Background,
47}
48
49#[derive(Debug, Clone, Default, Serialize, Deserialize)]
50pub struct MetaValues {
51    pub values: BTreeMap<MetaFieldKey, MetaFieldValue>,
52    pub revision: u64,
53}
54
55#[derive(Debug, Clone, Copy)]
56pub struct MetaValue<T> {
57    pub fetch_time: SystemTime,
58    pub value: Option<T>,
59}
60
61/// Service for managing the caching of meta fields.
62// a fancy DST to save one allocation.
63pub struct MetaService<S: ?Sized = dyn MetaSource> {
64    initial_fetch_waiter: Waiter,
65    meta_update_notify: Notify,
66    source: S,
67}
68
69impl<S: MetaSource + ?Sized> MetaService<S> {
70    pub fn new(source: S) -> Arc<MetaService>
71    where
72        S: Sized,
73    {
74        // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>`
75        Arc::new(MetaService {
76            initial_fetch_waiter: Waiter::new(),
77            meta_update_notify: Notify::new(),
78            source,
79        })
80    }
81
82    /// Get the value for the meta field.
83    ///
84    /// This may wait for significant time on first run.
85    pub async fn get_field<V: DeserializeOwned + 'static>(
86        &self,
87        db: &Database,
88        field: &str,
89    ) -> Option<MetaValue<V>> {
90        if let Some(value) = self.get_field_from_db(db, field).await {
91            // might be from in old cache.
92            // TODO: maybe old cache should have a ttl?
93            Some(value)
94        } else {
95            // wait for initial value
96            self.initial_fetch_waiter.wait().await;
97            self.get_field_from_db(db, field).await
98        }
99    }
100
101    async fn get_field_from_db<V: DeserializeOwned + 'static>(
102        &self,
103        db: &Database,
104        field: &str,
105    ) -> Option<MetaValue<V>> {
106        let dbtx = &mut db.begin_transaction_nc().await;
107        let info = dbtx.get_value(&MetaServiceInfoKey).await?;
108        let value = dbtx
109            .get_value(&MetaFieldKey(field.to_string()))
110            .await
111            .and_then(|value| parse_meta_value_static::<V>(&value.0).ok());
112        Some(MetaValue {
113            fetch_time: info.last_updated,
114            value,
115        })
116    }
117
118    async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
119        dbtx.get_value(&MetaServiceInfoKey)
120            .await
121            .map(|x| x.revision)
122    }
123
124    /// Wait until Meta Service is initialized, after this `get_field` will not
125    /// block.
126    pub async fn wait_initialization(&self) {
127        self.initial_fetch_waiter.wait().await;
128    }
129
130    /// NOTE: this subscription never ends even after update task is shutdown.
131    /// You should consume this stream in a spawn_cancellable.
132    pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
133        stream! {
134            let mut notify = pin!(self.meta_update_notify.notified());
135            loop {
136                notify.as_mut().await;
137                notify.set(self.meta_update_notify.notified());
138                // enable waiting for next notification before yield so don't miss
139                // any notifications.
140                notify.as_mut().enable();
141                yield ();
142            }
143        }
144    }
145
146    /// NOTE: this subscription never ends even after update task is shutdown.
147    /// You should consume this stream in a spawn_cancellable.
148    ///
149    /// Stream will yield the first element immediately without blocking.
150    /// The first element will be initial value of the field.
151    ///
152    /// This may yield an outdated initial value if you didn't call
153    /// [`Self::wait_initialization`].
154    pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
155        &'a self,
156        db: &'a Database,
157        name: &'a str,
158    ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
159        stream! {
160            let mut update_stream = pin!(self.subscribe_to_updates());
161            loop {
162                let value = self.get_field_from_db(db, name).await;
163                yield value;
164                if update_stream.next().await.is_none() {
165                    break;
166                }
167            }
168        }
169    }
170
171    /// Update all source in background.
172    ///
173    /// Caller should run this method in a task.
174    pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
175        let mut current_revision = self
176            .current_revision(&mut client.db().begin_transaction_nc().await)
177            .await;
178        let client_config = client.config().await;
179        let meta_values = self
180            .source
181            .fetch(
182                &client_config,
183                &client.api,
184                FetchKind::Initial,
185                current_revision,
186            )
187            .await;
188        let failed_initial = meta_values.is_err();
189        match meta_values {
190            Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
191            Err(error) => {
192                warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
193            }
194        };
195        self.initial_fetch_waiter.done();
196
197        // don't wait if we failed first item
198        if !failed_initial {
199            self.source.wait_for_update().await;
200        }
201
202        // now keep updating slowly
203        loop {
204            if let Ok(meta_values) = self
205                .source
206                .fetch(
207                    &client_config,
208                    &client.api,
209                    FetchKind::Background,
210                    current_revision,
211                )
212                .await
213            {
214                current_revision = Some(meta_values.revision);
215                self.save_meta_values(client, &meta_values).await;
216            }
217            self.source.wait_for_update().await;
218        }
219    }
220
221    async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
222        let mut dbtx = client.db().begin_transaction().await;
223        dbtx.remove_by_prefix(&MetaFieldPrefix).await;
224        dbtx.insert_entry(
225            &MetaServiceInfoKey,
226            &MetaServiceInfo {
227                last_updated: fedimint_core::time::now(),
228                revision: meta_values.revision,
229            },
230        )
231        .await;
232        for (key, value) in &meta_values.values {
233            dbtx.insert_entry(key, value).await;
234        }
235        dbtx.commit_tx().await;
236        // notify everyone about changes
237        self.meta_update_notify.notify_waiters();
238    }
239}
240
241/// Legacy non-meta module config source uses client config meta and
242/// meta_override_url meta field.
243#[derive(Clone, Debug, Default)]
244#[non_exhaustive]
245pub struct LegacyMetaSource {
246    reqwest: reqwest::Client,
247}
248
249#[apply(async_trait_maybe_send!)]
250impl MetaSource for LegacyMetaSource {
251    async fn wait_for_update(&self) {
252        fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await;
253    }
254
255    async fn fetch(
256        &self,
257        client_config: &ClientConfig,
258        _api: &DynGlobalApi,
259        fetch_kind: FetchKind,
260        last_revision: Option<u64>,
261    ) -> anyhow::Result<MetaValues> {
262        let config_iter = client_config
263            .global
264            .meta
265            .iter()
266            .map(|(key, value)| (MetaFieldKey(key.clone()), MetaFieldValue(value.clone())));
267        let backoff = match fetch_kind {
268            // need to be fast the first time.
269            FetchKind::Initial => backoff_util::aggressive_backoff(),
270            FetchKind::Background => backoff_util::background_backoff(),
271        };
272        let overrides = retry("fetch_meta_overrides", backoff, || {
273            fetch_meta_overrides(&self.reqwest, client_config, "meta_override_url")
274        })
275        .await?;
276        Ok(MetaValues {
277            values: config_iter.chain(overrides).collect(),
278            revision: last_revision.map_or(0, |r| r + 1),
279        })
280    }
281}
282
283pub async fn fetch_meta_overrides(
284    reqwest: &reqwest::Client,
285    client_config: &ClientConfig,
286    field_name: &str,
287) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> {
288    let Some(url) = client_config.meta::<String>(field_name)? else {
289        return Ok(BTreeMap::new());
290    };
291    let response = reqwest
292        .get(&url)
293        .send()
294        .await
295        .context("Meta override source could not be fetched")?;
296
297    debug!("Meta override source returned status: {response:?}");
298
299    if response.status() != reqwest::StatusCode::OK {
300        bail!(
301            "Meta override request returned non-OK status code: {}",
302            response.status()
303        );
304    }
305
306    let mut federation_map = response
307        .json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>()
308        .await
309        .context("Meta override could not be parsed as JSON")?;
310
311    let federation_id = client_config.calculate_federation_id().to_string();
312    let meta_fields = federation_map
313        .remove(&federation_id)
314        .with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))?
315        .into_iter()
316        .filter_map(|(key, value)| {
317            if let serde_json::Value::String(value_str) = value {
318                Some((MetaFieldKey(key), MetaFieldValue(value_str)))
319            } else {
320                warn!(target: LOG_CLIENT, "Meta override map contained non-string key: {key}, ignoring");
321                None
322            }
323        })
324        .collect::<BTreeMap<_, _>>();
325
326    Ok(meta_fields)
327}
328
329/// Tries to parse `str_value` as JSON. In the special case that `V` is `String`
330/// we return the raw `str_value` if JSON parsing fails. This necessary since
331/// the spec wasn't clear enough in the beginning.
332#[instrument(target = LOG_CLIENT, err)] // log on every failure
333pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
334    str_value: &str,
335) -> anyhow::Result<V> {
336    let res = serde_json::from_str(str_value)
337        .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
338
339    // In the past we encoded some string fields as "just a string" without quotes,
340    // this code ensures that old meta values still parse since config is hard to
341    // change
342    if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
343        let string_ret = Box::new(str_value.to_owned());
344        let ret: Box<V> = unsafe {
345            // We can transmute a String to V because we know that V==String
346            std::mem::transmute(string_ret)
347        };
348        Ok(*ret)
349    } else {
350        res
351    }
352}