1use std::borrow::Cow;
13use std::str::FromStr;
14use std::sync::atomic::{AtomicU64, Ordering};
15
16use fedimint_core::core::{ModuleInstanceId, ModuleKind};
17use fedimint_core::db::{
18 Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
19 NonCommittable,
20};
21use fedimint_core::encoding::{Decodable, Encodable};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::{apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
24use fedimint_logging::LOG_CLIENT_EVENT_LOG;
25use futures::{Future, StreamExt};
26use serde::{Deserialize, Serialize};
27use tokio::sync::{broadcast, watch};
28use tracing::{debug, trace};
29
30use super::DbKeyPrefix;
31
32pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
33 const MODULE: Option<ModuleKind>;
34 const KIND: EventKind;
35 const PERSIST: bool = true;
36}
37
38static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
41
42#[derive(Debug, Encodable, Decodable)]
49pub struct UnordedEventLogId {
50 ts_usecs: u64,
51 counter: u64,
52}
53
54impl UnordedEventLogId {
55 fn new() -> Self {
56 Self {
57 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
58 .unwrap_or(u64::MAX),
60 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
61 }
62 }
63}
64
65#[derive(
68 Copy,
69 Clone,
70 Debug,
71 Encodable,
72 Decodable,
73 Default,
74 PartialEq,
75 Eq,
76 PartialOrd,
77 Ord,
78 Serialize,
79 Deserialize,
80)]
81pub struct EventLogId(u64);
82
83impl EventLogId {
84 fn next(self) -> EventLogId {
85 Self(self.0 + 1)
86 }
87
88 fn saturating_add(self, rhs: u64) -> EventLogId {
89 Self(self.0.saturating_add(rhs))
90 }
91}
92
93impl FromStr for EventLogId {
94 type Err = <u64 as FromStr>::Err;
95
96 fn from_str(s: &str) -> Result<Self, Self::Err> {
97 u64::from_str(s).map(Self)
98 }
99}
100
101#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
102pub struct EventKind(Cow<'static, str>);
103
104impl EventKind {
105 pub const fn from_static(value: &'static str) -> Self {
106 Self(Cow::Borrowed(value))
107 }
108}
109
110impl<'s> From<&'s str> for EventKind {
111 fn from(value: &'s str) -> Self {
112 Self(Cow::Owned(value.to_owned()))
113 }
114}
115
116impl From<String> for EventKind {
117 fn from(value: String) -> Self {
118 Self(Cow::Owned(value))
119 }
120}
121
122#[derive(Debug, Encodable, Decodable, Clone)]
123pub struct UnorderedEventLogEntry {
124 pub persist: bool,
125 pub inner: EventLogEntry,
126}
127
128#[derive(Debug, Encodable, Decodable, Clone)]
129pub struct EventLogEntry {
130 pub kind: EventKind,
137
138 pub module: Option<(ModuleKind, ModuleInstanceId)>,
146
147 ts_usecs: u64,
149
150 pub payload: Vec<u8>,
153}
154
155impl_db_record!(
156 key = UnordedEventLogId,
157 value = UnorderedEventLogEntry,
158 db_prefix = DbKeyPrefix::UnorderedEventLog,
159);
160
161#[derive(Clone, Debug, Encodable, Decodable)]
162pub struct UnorderedEventLogIdPrefixAll;
163
164impl_db_lookup!(
165 key = UnordedEventLogId,
166 query_prefix = UnorderedEventLogIdPrefixAll
167);
168
169#[derive(Clone, Debug, Encodable, Decodable)]
170pub struct EventLogIdPrefixAll;
171
172#[derive(Clone, Debug, Encodable, Decodable)]
173pub struct EventLogIdPrefix(EventLogId);
174
175impl_db_record!(
176 key = EventLogId,
177 value = EventLogEntry,
178 db_prefix = DbKeyPrefix::EventLog,
179);
180
181impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
182
183impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
184
185#[apply(async_trait_maybe_send!)]
186pub trait DBTransactionEventLogExt {
187 async fn log_event_raw(
188 &mut self,
189 log_ordering_wakeup_tx: watch::Sender<()>,
190 kind: EventKind,
191 module_kind: Option<ModuleKind>,
192 module_id: Option<ModuleInstanceId>,
193 payload: Vec<u8>,
194 persist: bool,
195 );
196
197 async fn log_event<E>(
202 &mut self,
203 log_ordering_wakeup_tx: watch::Sender<()>,
204 module_id: Option<ModuleInstanceId>,
205 event: E,
206 ) where
207 E: Event + Send,
208 {
209 self.log_event_raw(
210 log_ordering_wakeup_tx,
211 E::KIND,
212 E::MODULE,
213 module_id,
214 serde_json::to_vec(&event).expect("Serialization can't fail"),
215 <E as Event>::PERSIST,
216 )
217 .await;
218 }
219
220 async fn get_next_event_log_id(&mut self) -> EventLogId;
225
226 async fn get_event_log(
228 &mut self,
229 pos: Option<EventLogId>,
230 limit: u64,
231 ) -> Vec<(
232 EventLogId,
233 EventKind,
234 Option<(ModuleKind, ModuleInstanceId)>,
235 u64,
236 serde_json::Value,
237 )>;
238}
239
240#[apply(async_trait_maybe_send!)]
241impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
242where
243 Cap: Send,
244{
245 async fn log_event_raw(
246 &mut self,
247 log_ordering_wakeup_tx: watch::Sender<()>,
248 kind: EventKind,
249 module_kind: Option<ModuleKind>,
250 module_id: Option<ModuleInstanceId>,
251 payload: Vec<u8>,
252 persist: bool,
253 ) {
254 assert_eq!(
255 module_kind.is_some(),
256 module_id.is_some(),
257 "Events of modules must have module_id set"
258 );
259
260 let unordered_id = UnordedEventLogId::new();
261 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
262
263 if self
264 .insert_entry(
265 &unordered_id,
266 &UnorderedEventLogEntry {
267 persist,
268 inner: EventLogEntry {
269 kind,
270 module: module_kind.map(|kind| (kind, module_id.unwrap())),
271 ts_usecs: unordered_id.ts_usecs,
272 payload,
273 },
274 },
275 )
276 .await
277 .is_some()
278 {
279 panic!("Trying to overwrite event in the client event log");
280 }
281 self.on_commit(move || {
282 let _ = log_ordering_wakeup_tx.send(());
283 });
284 }
285
286 async fn get_next_event_log_id(&mut self) -> EventLogId {
287 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
288 .await
289 .next()
290 .await
291 .map(|(k, _v)| k.next())
292 .unwrap_or_default()
293 }
294
295 async fn get_event_log(
296 &mut self,
297 pos: Option<EventLogId>,
298 limit: u64,
299 ) -> Vec<(
300 EventLogId,
301 EventKind,
302 Option<(ModuleKind, ModuleInstanceId)>,
303 u64,
304 serde_json::Value,
305 )> {
306 let pos = pos.unwrap_or_default();
307 self.find_by_range(pos..pos.saturating_add(limit))
308 .await
309 .map(|(k, v)| {
310 (
311 k,
312 v.kind,
313 v.module,
314 v.ts_usecs,
315 serde_json::from_slice(&v.payload).unwrap_or_default(),
316 )
317 })
318 .collect()
319 .await
320 }
321}
322
323pub(crate) async fn run_event_log_ordering_task(
326 db: Database,
327 mut log_ordering_task_wakeup: watch::Receiver<()>,
328 log_event_added: watch::Sender<()>,
329 log_event_added_transient: broadcast::Sender<EventLogEntry>,
330) {
331 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
332 let mut next_entry_id = db
333 .begin_transaction_nc()
334 .await
335 .get_next_event_log_id()
336 .await;
337
338 loop {
339 let mut dbtx = db.begin_transaction().await;
340
341 let unordered_events = dbtx
342 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
343 .await
344 .collect::<Vec<_>>()
345 .await;
346 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
347
348 for (unordered_id, entry) in &unordered_events {
349 assert!(
350 dbtx.remove_entry(unordered_id).await.is_some(),
351 "Must never fail to remove entry"
352 );
353 if entry.persist {
354 assert!(
355 dbtx.insert_entry(&next_entry_id, &entry.inner)
356 .await
357 .is_none(),
358 "Must never overwrite existing event"
359 );
360 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
361 next_entry_id = next_entry_id.next();
362 } else {
363 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
364 dbtx.on_commit({
365 let log_event_added_transient = log_event_added_transient.clone();
366 let entry = entry.inner.clone();
367
368 move || {
369 let _ = log_event_added_transient.send(entry);
371 }
372 });
373 }
374 }
375
376 dbtx.commit_tx().await;
380 if !unordered_events.is_empty() {
381 let _ = log_event_added.send(());
382 }
383
384 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
385 if log_ordering_task_wakeup.changed().await.is_err() {
386 break;
387 }
388 }
389
390 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
391}
392
393pub async fn handle_events<F, R, K>(
394 db: Database,
395 pos_key: &K,
396 mut log_event_added: watch::Receiver<()>,
397 call_fn: F,
398) -> anyhow::Result<()>
399where
400 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
401 K: DatabaseRecord<Value = EventLogId>,
402 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
403 R: Future<Output = anyhow::Result<()>>,
404{
405 let mut next_key: EventLogId = db
406 .begin_transaction_nc()
407 .await
408 .get_value(pos_key)
409 .await
410 .unwrap_or_default();
411
412 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
413
414 loop {
415 let mut dbtx = db.begin_transaction().await;
416
417 if let Some(event) = dbtx.get_value(&next_key).await {
418 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
419
420 next_key = next_key.next();
421 dbtx.insert_entry(pos_key, &next_key).await;
422
423 dbtx.commit_tx().await;
424 } else if log_event_added.changed().await.is_err() {
425 break Ok(());
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use std::sync::atomic::AtomicU8;
433 use std::sync::Arc;
434
435 use anyhow::bail;
436 use fedimint_core::db::mem_impl::MemDatabase;
437 use fedimint_core::db::IRawDatabaseExt as _;
438 use fedimint_core::encoding::{Decodable, Encodable};
439 use fedimint_core::impl_db_record;
440 use fedimint_core::task::TaskGroup;
441 use tokio::sync::{broadcast, watch};
442 use tokio::try_join;
443 use tracing::info;
444
445 use super::{
446 handle_events, run_event_log_ordering_task, DBTransactionEventLogExt as _, EventLogId,
447 };
448 use crate::db::event_log::EventKind;
449
450 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
451 pub struct TestLogIdKey;
452
453 impl_db_record!(key = TestLogIdKey, value = EventLogId, db_prefix = 0x00,);
454
455 #[test_log::test(tokio::test)]
456 async fn sanity_handle_events() {
457 let db = MemDatabase::new().into_database();
458 let tg = TaskGroup::new();
459
460 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
461 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
462 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
463 broadcast::channel(1024);
464
465 tg.spawn_cancellable(
466 "event log ordering task",
467 run_event_log_ordering_task(
468 db.clone(),
469 log_ordering_wakeup_rx,
470 log_event_added_tx,
471 log_event_added_transient_tx,
472 ),
473 );
474
475 let counter = Arc::new(AtomicU8::new(0));
476
477 let _ = try_join!(
478 handle_events(
479 db.clone(),
480 &TestLogIdKey,
481 log_event_added_rx,
482 move |_dbtx, event| {
483 let counter = counter.clone();
484 Box::pin(async move {
485 info!("{event:?}");
486
487 assert_eq!(
488 event.kind,
489 EventKind::from(format!(
490 "{}",
491 counter.load(std::sync::atomic::Ordering::Relaxed)
492 ))
493 );
494
495 if counter.load(std::sync::atomic::Ordering::Relaxed) == 4 {
496 bail!("Time to wrap up");
497 }
498 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
499 Ok(())
500 })
501 },
502 ),
503 async {
504 for i in 0..=4 {
505 let mut dbtx = db.begin_transaction().await;
506 dbtx.log_event_raw(
507 log_ordering_wakeup_tx.clone(),
508 EventKind::from(format!("{i}")),
509 None,
510 None,
511 vec![],
512 true,
513 )
514 .await;
515
516 dbtx.commit_tx().await;
517 }
518
519 Ok(())
520 }
521 );
522 }
523}