fedimint_client/
meta.rs

1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use anyhow::{bail, Context as _};
7use async_stream::stream;
8use fedimint_api_client::api::DynGlobalApi;
9use fedimint_core::config::ClientConfig;
10use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
11use fedimint_core::task::waiter::Waiter;
12use fedimint_core::task::{MaybeSend, MaybeSync};
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{apply, async_trait_maybe_send};
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use tokio::sync::Notify;
18use tokio_stream::{Stream, StreamExt as _};
19use tracing::{debug, instrument, warn};
20
21use crate::db::{
22    MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
23};
24use crate::Client;
25
26#[apply(async_trait_maybe_send!)]
27pub trait MetaSource: MaybeSend + MaybeSync + 'static {
28    /// Wait for next change in this source.
29    async fn wait_for_update(&self);
30    async fn fetch(
31        &self,
32        client_config: &ClientConfig,
33        api: &DynGlobalApi,
34        fetch_kind: FetchKind,
35        last_revision: Option<u64>,
36    ) -> anyhow::Result<MetaValues>;
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum FetchKind {
41    /// Meta source should return fast, retry less.
42    /// This blocks getting any meta values.
43    Initial,
44    /// Meta source can retry infinitely.
45    Background,
46}
47
48#[derive(Debug, Clone, Default, Serialize, Deserialize)]
49pub struct MetaValues {
50    pub values: BTreeMap<MetaFieldKey, MetaFieldValue>,
51    pub revision: u64,
52}
53
54#[derive(Debug, Clone, Copy)]
55pub struct MetaValue<T> {
56    pub fetch_time: SystemTime,
57    pub value: Option<T>,
58}
59
60/// Service for managing the caching of meta fields.
61// a fancy DST to save one allocation.
62pub struct MetaService<S: ?Sized = dyn MetaSource> {
63    initial_fetch_waiter: Waiter,
64    meta_update_notify: Notify,
65    source: S,
66}
67
68impl<S: MetaSource + ?Sized> MetaService<S> {
69    pub fn new(source: S) -> Arc<MetaService>
70    where
71        S: Sized,
72    {
73        // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>`
74        Arc::new(MetaService {
75            initial_fetch_waiter: Waiter::new(),
76            meta_update_notify: Notify::new(),
77            source,
78        })
79    }
80
81    /// Get the value for the meta field.
82    ///
83    /// This may wait for significant time on first run.
84    pub async fn get_field<V: DeserializeOwned + 'static>(
85        &self,
86        db: &Database,
87        field: &str,
88    ) -> Option<MetaValue<V>> {
89        if let Some(value) = self.get_field_from_db(db, field).await {
90            // might be from in old cache.
91            // TODO: maybe old cache should have a ttl?
92            Some(value)
93        } else {
94            // wait for initial value
95            self.initial_fetch_waiter.wait().await;
96            self.get_field_from_db(db, field).await
97        }
98    }
99
100    async fn get_field_from_db<V: DeserializeOwned + 'static>(
101        &self,
102        db: &Database,
103        field: &str,
104    ) -> Option<MetaValue<V>> {
105        let dbtx = &mut db.begin_transaction_nc().await;
106        let info = dbtx.get_value(&MetaServiceInfoKey).await?;
107        let value = dbtx
108            .get_value(&MetaFieldKey(field.to_string()))
109            .await
110            .and_then(|value| parse_meta_value_static::<V>(&value.0).ok());
111        Some(MetaValue {
112            fetch_time: info.last_updated,
113            value,
114        })
115    }
116
117    async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
118        dbtx.get_value(&MetaServiceInfoKey)
119            .await
120            .map(|x| x.revision)
121    }
122
123    /// Wait until Meta Service is initialized, after this `get_field` will not
124    /// block.
125    pub async fn wait_initialization(&self) {
126        self.initial_fetch_waiter.wait().await;
127    }
128
129    /// NOTE: this subscription never ends even after update task is shutdown.
130    /// You should consume this stream in a spawn_cancellable.
131    pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
132        stream! {
133            let mut notify = pin!(self.meta_update_notify.notified());
134            loop {
135                notify.as_mut().await;
136                notify.set(self.meta_update_notify.notified());
137                // enable waiting for next notification before yield so don't miss
138                // any notifications.
139                notify.as_mut().enable();
140                yield ();
141            }
142        }
143    }
144
145    /// NOTE: this subscription never ends even after update task is shutdown.
146    /// You should consume this stream in a spawn_cancellable.
147    ///
148    /// Stream will yield the first element immediately without blocking.
149    /// The first element will be initial value of the field.
150    ///
151    /// This may yield an outdated initial value if you didn't call
152    /// [`Self::wait_initialization`].
153    pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
154        &'a self,
155        db: &'a Database,
156        name: &'a str,
157    ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
158        stream! {
159            let mut update_stream = pin!(self.subscribe_to_updates());
160            loop {
161                let value = self.get_field_from_db(db, name).await;
162                yield value;
163                if update_stream.next().await.is_none() {
164                    break;
165                }
166            }
167        }
168    }
169
170    /// Update all source in background.
171    ///
172    /// Caller should run this method in a task.
173    pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
174        let mut current_revision = self
175            .current_revision(&mut client.db().begin_transaction_nc().await)
176            .await;
177        let client_config = client.config().await;
178        let meta_values = self
179            .source
180            .fetch(
181                &client_config,
182                &client.api,
183                FetchKind::Initial,
184                current_revision,
185            )
186            .await;
187        let failed_initial = meta_values.is_err();
188        match meta_values {
189            Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
190            Err(error) => warn!(%error, "failed to fetch source"),
191        };
192        self.initial_fetch_waiter.done();
193
194        // don't wait if we failed first item
195        if !failed_initial {
196            self.source.wait_for_update().await;
197        }
198
199        // now keep updating slowly
200        loop {
201            if let Ok(meta_values) = self
202                .source
203                .fetch(
204                    &client_config,
205                    &client.api,
206                    FetchKind::Background,
207                    current_revision,
208                )
209                .await
210            {
211                current_revision = Some(meta_values.revision);
212                self.save_meta_values(client, &meta_values).await;
213            }
214            self.source.wait_for_update().await;
215        }
216    }
217
218    async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
219        let mut dbtx = client.db().begin_transaction().await;
220        dbtx.remove_by_prefix(&MetaFieldPrefix).await;
221        dbtx.insert_entry(
222            &MetaServiceInfoKey,
223            &MetaServiceInfo {
224                last_updated: fedimint_core::time::now(),
225                revision: meta_values.revision,
226            },
227        )
228        .await;
229        for (key, value) in &meta_values.values {
230            dbtx.insert_entry(key, value).await;
231        }
232        dbtx.commit_tx().await;
233        // notify everyone about changes
234        self.meta_update_notify.notify_waiters();
235    }
236}
237
238/// Legacy non-meta module config source uses client config meta and
239/// meta_override_url meta field.
240#[derive(Clone, Debug, Default)]
241#[non_exhaustive]
242pub struct LegacyMetaSource {
243    reqwest: reqwest::Client,
244}
245
246#[apply(async_trait_maybe_send!)]
247impl MetaSource for LegacyMetaSource {
248    async fn wait_for_update(&self) {
249        fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await;
250    }
251
252    async fn fetch(
253        &self,
254        client_config: &ClientConfig,
255        _api: &DynGlobalApi,
256        fetch_kind: FetchKind,
257        last_revision: Option<u64>,
258    ) -> anyhow::Result<MetaValues> {
259        let config_iter = client_config
260            .global
261            .meta
262            .iter()
263            .map(|(key, value)| (MetaFieldKey(key.clone()), MetaFieldValue(value.clone())));
264        let backoff = match fetch_kind {
265            // need to be fast the first time.
266            FetchKind::Initial => backoff_util::aggressive_backoff(),
267            FetchKind::Background => backoff_util::background_backoff(),
268        };
269        let overrides = retry("fetch_meta_overrides", backoff, || {
270            fetch_meta_overrides(&self.reqwest, client_config, "meta_override_url")
271        })
272        .await?;
273        Ok(MetaValues {
274            values: config_iter.chain(overrides).collect(),
275            revision: last_revision.map_or(0, |r| r + 1),
276        })
277    }
278}
279
280pub async fn fetch_meta_overrides(
281    reqwest: &reqwest::Client,
282    client_config: &ClientConfig,
283    field_name: &str,
284) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> {
285    let Some(url) = client_config.meta::<String>(field_name)? else {
286        return Ok(BTreeMap::new());
287    };
288    let response = reqwest
289        .get(&url)
290        .send()
291        .await
292        .context("Meta override source could not be fetched")?;
293
294    debug!("Meta override source returned status: {response:?}");
295
296    if response.status() != reqwest::StatusCode::OK {
297        bail!(
298            "Meta override request returned non-OK status code: {}",
299            response.status()
300        );
301    }
302
303    let mut federation_map = response
304        .json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>()
305        .await
306        .context("Meta override could not be parsed as JSON")?;
307
308    let federation_id = client_config.calculate_federation_id().to_string();
309    let meta_fields = federation_map
310        .remove(&federation_id)
311        .with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))?
312        .into_iter()
313        .filter_map(|(key, value)| {
314            if let serde_json::Value::String(value_str) = value {
315                Some((MetaFieldKey(key), MetaFieldValue(value_str)))
316            } else {
317                warn!("Meta override map contained non-string key: {key}, ignoring");
318                None
319            }
320        })
321        .collect::<BTreeMap<_, _>>();
322
323    Ok(meta_fields)
324}
325
326/// Tries to parse `str_value` as JSON. In the special case that `V` is `String`
327/// we return the raw `str_value` if JSON parsing fails. This necessary since
328/// the spec wasn't clear enough in the beginning.
329#[instrument(err)] // log on every failure
330pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
331    str_value: &str,
332) -> anyhow::Result<V> {
333    let res = serde_json::from_str(str_value)
334        .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
335
336    // In the past we encoded some string fields as "just a string" without quotes,
337    // this code ensures that old meta values still parse since config is hard to
338    // change
339    if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
340        let string_ret = Box::new(str_value.to_owned());
341        let ret: Box<V> = unsafe {
342            // We can transmute a String to V because we know that V==String
343            std::mem::transmute(string_ret)
344        };
345        Ok(*ret)
346    } else {
347        res
348    }
349}