fedimint_client/db/
event_log.rs

1//! Client Event Log
2//!
3//! The goal here is to maintain a single, ordered, append only
4//! log of all important client-side events: low or high level,
5//! and move as much of coordination between different parts of
6//! the system in a natural and decomposed way.
7//!
8//! Any event log "follower" can just keep going through
9//! all events and react to ones it is interested in (and understands),
10//! potentially emitting events of its own, and atomically updating persisted
11//! event log position ("cursor") of events that were already processed.
12use std::borrow::Cow;
13use std::str::FromStr;
14use std::sync::atomic::{AtomicU64, Ordering};
15
16use fedimint_core::core::{ModuleInstanceId, ModuleKind};
17use fedimint_core::db::{
18    Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
19    NonCommittable,
20};
21use fedimint_core::encoding::{Decodable, Encodable};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
24use fedimint_logging::LOG_CLIENT_EVENT_LOG;
25use futures::{Future, StreamExt};
26use serde::{Deserialize, Serialize};
27use tokio::sync::{broadcast, watch};
28use tracing::{debug, trace};
29
30use super::DbKeyPrefix;
31
32pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
33    const MODULE: Option<ModuleKind>;
34    const KIND: EventKind;
35    const PERSIST: bool = true;
36}
37
38/// An counter that resets on every restart, that guarantees that
39/// [`UnordedEventLogId`]s don't conflict with each other.
40static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
41
42/// A self-allocated ID that is mostly ordered
43///
44/// The goal here is to avoid concurrent database transaction
45/// conflicts due the ID allocation. Instead they are picked based on
46/// a time and a counter, so they are mostly but not strictly ordered and
47/// monotonic, and even more imporantly: not contiguous.
48#[derive(Debug, Encodable, Decodable)]
49pub struct UnordedEventLogId {
50    ts_usecs: u64,
51    counter: u64,
52}
53
54impl UnordedEventLogId {
55    fn new() -> Self {
56        Self {
57            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
58                // This will never happen
59                .unwrap_or(u64::MAX),
60            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
61        }
62    }
63}
64
65/// Ordered, contiguous ID space, which is easy for event log followers to
66/// track.
67#[derive(
68    Copy,
69    Clone,
70    Debug,
71    Encodable,
72    Decodable,
73    Default,
74    PartialEq,
75    Eq,
76    PartialOrd,
77    Ord,
78    Serialize,
79    Deserialize,
80)]
81pub struct EventLogId(u64);
82
83impl EventLogId {
84    fn next(self) -> EventLogId {
85        Self(self.0 + 1)
86    }
87
88    fn saturating_add(self, rhs: u64) -> EventLogId {
89        Self(self.0.saturating_add(rhs))
90    }
91}
92
93impl FromStr for EventLogId {
94    type Err = <u64 as FromStr>::Err;
95
96    fn from_str(s: &str) -> Result<Self, Self::Err> {
97        u64::from_str(s).map(Self)
98    }
99}
100
101#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
102pub struct EventKind(Cow<'static, str>);
103
104impl EventKind {
105    pub const fn from_static(value: &'static str) -> Self {
106        Self(Cow::Borrowed(value))
107    }
108}
109
110impl<'s> From<&'s str> for EventKind {
111    fn from(value: &'s str) -> Self {
112        Self(Cow::Owned(value.to_owned()))
113    }
114}
115
116impl From<String> for EventKind {
117    fn from(value: String) -> Self {
118        Self(Cow::Owned(value))
119    }
120}
121
122#[derive(Debug, Encodable, Decodable, Clone)]
123pub struct UnorderedEventLogEntry {
124    pub persist: bool,
125    pub inner: EventLogEntry,
126}
127
128#[derive(Debug, Encodable, Decodable, Clone)]
129pub struct EventLogEntry {
130    /// Type/kind of the event
131    ///
132    /// Any part of the client is free to self-allocate identifier, denoting a
133    /// certain kind of an event. Notably one event kind have multiple
134    /// instances. E.g. "successful wallet deposit" can be an event kind,
135    /// and it can happen multiple times with different payloads.
136    pub kind: EventKind,
137
138    /// To prevent accidental conflicts between `kind`s, a module kind the
139    /// given event kind belong is used as well.
140    ///
141    /// Note: the meaning of this field is mostly about which part of the code
142    /// defines this event kind. Oftentime a core (non-module)-defined event
143    /// will refer in some way to a module. It should use a separate `module_id`
144    /// field in the `payload`, instead of this field.
145    pub module: Option<(ModuleKind, ModuleInstanceId)>,
146
147    /// Timestamp in microseconds after unix epoch
148    ts_usecs: u64,
149
150    /// Event-kind specific payload, typically encoded as a json string for
151    /// flexibility.
152    pub payload: Vec<u8>,
153}
154
155impl_db_record!(
156    key = UnordedEventLogId,
157    value = UnorderedEventLogEntry,
158    db_prefix = DbKeyPrefix::UnorderedEventLog,
159);
160
161#[derive(Clone, Debug, Encodable, Decodable)]
162pub struct UnorderedEventLogIdPrefixAll;
163
164impl_db_lookup!(
165    key = UnordedEventLogId,
166    query_prefix = UnorderedEventLogIdPrefixAll
167);
168
169#[derive(Clone, Debug, Encodable, Decodable)]
170pub struct EventLogIdPrefixAll;
171
172#[derive(Clone, Debug, Encodable, Decodable)]
173pub struct EventLogIdPrefix(EventLogId);
174
175impl_db_record!(
176    key = EventLogId,
177    value = EventLogEntry,
178    db_prefix = DbKeyPrefix::EventLog,
179);
180
181impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
182
183impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
184
185#[apply(async_trait_maybe_send!)]
186pub trait DBTransactionEventLogExt {
187    async fn log_event_raw(
188        &mut self,
189        log_ordering_wakeup_tx: watch::Sender<()>,
190        kind: EventKind,
191        module_kind: Option<ModuleKind>,
192        module_id: Option<ModuleInstanceId>,
193        payload: Vec<u8>,
194        persist: bool,
195    );
196
197    /// Log an event log event
198    ///
199    /// The event will start "unordered", but after it is committed an ordering
200    /// task will be notified to "order" it into a final ordered log.
201    async fn log_event<E>(
202        &mut self,
203        log_ordering_wakeup_tx: watch::Sender<()>,
204        module_id: Option<ModuleInstanceId>,
205        event: E,
206    ) where
207        E: Event + Send,
208    {
209        self.log_event_raw(
210            log_ordering_wakeup_tx,
211            E::KIND,
212            E::MODULE,
213            module_id,
214            serde_json::to_vec(&event).expect("Serialization can't fail"),
215            <E as Event>::PERSIST,
216        )
217        .await;
218    }
219
220    /// Next [`EventLogId`] to use for new ordered events.
221    ///
222    /// Used by ordering task, though might be
223    /// useful to get the current count of events.
224    async fn get_next_event_log_id(&mut self) -> EventLogId;
225
226    /// Read a part of the event log.
227    async fn get_event_log(
228        &mut self,
229        pos: Option<EventLogId>,
230        limit: u64,
231    ) -> Vec<(
232        EventLogId,
233        EventKind,
234        Option<(ModuleKind, ModuleInstanceId)>,
235        u64,
236        serde_json::Value,
237    )>;
238}
239
240#[apply(async_trait_maybe_send!)]
241impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
242where
243    Cap: Send,
244{
245    async fn log_event_raw(
246        &mut self,
247        log_ordering_wakeup_tx: watch::Sender<()>,
248        kind: EventKind,
249        module_kind: Option<ModuleKind>,
250        module_id: Option<ModuleInstanceId>,
251        payload: Vec<u8>,
252        persist: bool,
253    ) {
254        assert_eq!(
255            module_kind.is_some(),
256            module_id.is_some(),
257            "Events of modules must have module_id set"
258        );
259
260        let unordered_id = UnordedEventLogId::new();
261        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
262
263        if self
264            .insert_entry(
265                &unordered_id,
266                &UnorderedEventLogEntry {
267                    persist,
268                    inner: EventLogEntry {
269                        kind,
270                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
271                        ts_usecs: unordered_id.ts_usecs,
272                        payload,
273                    },
274                },
275            )
276            .await
277            .is_some()
278        {
279            panic!("Trying to overwrite event in the client event log");
280        }
281        self.on_commit(move || {
282            let _ = log_ordering_wakeup_tx.send(());
283        });
284    }
285
286    async fn get_next_event_log_id(&mut self) -> EventLogId {
287        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
288            .await
289            .next()
290            .await
291            .map(|(k, _v)| k.next())
292            .unwrap_or_default()
293    }
294
295    async fn get_event_log(
296        &mut self,
297        pos: Option<EventLogId>,
298        limit: u64,
299    ) -> Vec<(
300        EventLogId,
301        EventKind,
302        Option<(ModuleKind, ModuleInstanceId)>,
303        u64,
304        serde_json::Value,
305    )> {
306        let pos = pos.unwrap_or_default();
307        self.find_by_range(pos..pos.saturating_add(limit))
308            .await
309            .map(|(k, v)| {
310                (
311                    k,
312                    v.kind,
313                    v.module,
314                    v.ts_usecs,
315                    serde_json::from_slice(&v.payload).unwrap_or_default(),
316                )
317            })
318            .collect()
319            .await
320    }
321}
322
323/// The code that handles new unordered events and rewriters them fully ordered
324/// into the final event log.
325pub(crate) async fn run_event_log_ordering_task(
326    db: Database,
327    mut log_ordering_task_wakeup: watch::Receiver<()>,
328    log_event_added: watch::Sender<()>,
329    log_event_added_transient: broadcast::Sender<EventLogEntry>,
330) {
331    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
332    let mut next_entry_id = db
333        .begin_transaction_nc()
334        .await
335        .get_next_event_log_id()
336        .await;
337
338    loop {
339        let mut dbtx = db.begin_transaction().await;
340
341        let unordered_events = dbtx
342            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
343            .await
344            .collect::<Vec<_>>()
345            .await;
346        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
347
348        for (unordered_id, entry) in &unordered_events {
349            assert!(
350                dbtx.remove_entry(unordered_id).await.is_some(),
351                "Must never fail to remove entry"
352            );
353            if entry.persist {
354                assert!(
355                    dbtx.insert_entry(&next_entry_id, &entry.inner)
356                        .await
357                        .is_none(),
358                    "Must never overwrite existing event"
359                );
360                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
361                next_entry_id = next_entry_id.next();
362            } else {
363                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
364                dbtx.on_commit({
365                    let log_event_added_transient = log_event_added_transient.clone();
366                    let entry = entry.inner.clone();
367
368                    move || {
369                        // we ignore the no-subscribers
370                        let _ = log_event_added_transient.send(entry);
371                    }
372                });
373            }
374        }
375
376        // This thread is the only thread deleting already existing element of unordered
377        // log and inserting new elements into ordered log, so it should never
378        // fail to commit.
379        dbtx.commit_tx().await;
380        if !unordered_events.is_empty() {
381            let _ = log_event_added.send(());
382        }
383
384        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
385        if log_ordering_task_wakeup.changed().await.is_err() {
386            break;
387        }
388    }
389
390    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
391}
392
393pub async fn handle_events<F, R, K>(
394    db: Database,
395    pos_key: &K,
396    mut log_event_added: watch::Receiver<()>,
397    call_fn: F,
398) -> anyhow::Result<()>
399where
400    K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
401    K: DatabaseRecord<Value = EventLogId>,
402    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
403    R: Future<Output = anyhow::Result<()>>,
404{
405    let mut next_key: EventLogId = db
406        .begin_transaction_nc()
407        .await
408        .get_value(pos_key)
409        .await
410        .unwrap_or_default();
411
412    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
413
414    loop {
415        let mut dbtx = db.begin_transaction().await;
416
417        if let Some(event) = dbtx.get_value(&next_key).await {
418            (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
419
420            next_key = next_key.next();
421            dbtx.insert_entry(pos_key, &next_key).await;
422
423            dbtx.commit_tx().await;
424        } else if log_event_added.changed().await.is_err() {
425            break Ok(());
426        }
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use std::sync::atomic::AtomicU8;
433    use std::sync::Arc;
434
435    use anyhow::bail;
436    use fedimint_core::db::mem_impl::MemDatabase;
437    use fedimint_core::db::IRawDatabaseExt as _;
438    use fedimint_core::encoding::{Decodable, Encodable};
439    use fedimint_core::impl_db_record;
440    use fedimint_core::task::TaskGroup;
441    use tokio::sync::{broadcast, watch};
442    use tokio::try_join;
443    use tracing::info;
444
445    use super::{
446        handle_events, run_event_log_ordering_task, DBTransactionEventLogExt as _, EventLogId,
447    };
448    use crate::db::event_log::EventKind;
449
450    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
451    pub struct TestLogIdKey;
452
453    impl_db_record!(key = TestLogIdKey, value = EventLogId, db_prefix = 0x00,);
454
455    #[test_log::test(tokio::test)]
456    async fn sanity_handle_events() {
457        let db = MemDatabase::new().into_database();
458        let tg = TaskGroup::new();
459
460        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
461        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
462        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
463            broadcast::channel(1024);
464
465        tg.spawn_cancellable(
466            "event log ordering task",
467            run_event_log_ordering_task(
468                db.clone(),
469                log_ordering_wakeup_rx,
470                log_event_added_tx,
471                log_event_added_transient_tx,
472            ),
473        );
474
475        let counter = Arc::new(AtomicU8::new(0));
476
477        let _ = try_join!(
478            handle_events(
479                db.clone(),
480                &TestLogIdKey,
481                log_event_added_rx,
482                move |_dbtx, event| {
483                    let counter = counter.clone();
484                    Box::pin(async move {
485                        info!("{event:?}");
486
487                        assert_eq!(
488                            event.kind,
489                            EventKind::from(format!(
490                                "{}",
491                                counter.load(std::sync::atomic::Ordering::Relaxed)
492                            ))
493                        );
494
495                        if counter.load(std::sync::atomic::Ordering::Relaxed) == 4 {
496                            bail!("Time to wrap up");
497                        }
498                        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
499                        Ok(())
500                    })
501                },
502            ),
503            async {
504                for i in 0..=4 {
505                    let mut dbtx = db.begin_transaction().await;
506                    dbtx.log_event_raw(
507                        log_ordering_wakeup_tx.clone(),
508                        EventKind::from(format!("{i}")),
509                        None,
510                        None,
511                        vec![],
512                        true,
513                    )
514                    .await;
515
516                    dbtx.commit_tx().await;
517                }
518
519                Ok(())
520            }
521        );
522    }
523}