1use std::collections::HashSet;
2use std::fmt::Debug;
3use std::future;
4use std::ops::Range;
5use std::time::{Duration, SystemTime};
6
7use async_stream::stream;
8use fedimint_core::core::OperationId;
9use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
10use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
11use fedimint_core::module::registry::ModuleDecoderRegistry;
12use fedimint_core::task::{MaybeSend, MaybeSync};
13use fedimint_core::time::now;
14use fedimint_core::util::BoxStream;
15use fedimint_logging::LOG_CLIENT;
16use futures::{stream, Stream, StreamExt};
17use serde::de::DeserializeOwned;
18use serde::{Deserialize, Serialize};
19use tokio::sync::OnceCell;
20use tracing::{error, instrument, warn};
21
22use crate::db::{
23 ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey,
24};
25
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(transparent)]
29pub struct JsonStringed(pub serde_json::Value);
30
31impl Encodable for JsonStringed {
32 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
33 let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
34 json_str.consensus_encode(writer)
35 }
36}
37
38impl Decodable for JsonStringed {
39 fn consensus_decode_partial<R: std::io::Read>(
40 r: &mut R,
41 modules: &ModuleDecoderRegistry,
42 ) -> Result<Self, DecodeError> {
43 let json_str = String::consensus_decode_partial(r, modules)?;
44 let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
45 Ok(JsonStringed(value))
46 }
47}
48
49#[derive(Debug, Clone)]
50pub struct OperationLog {
51 db: Database,
52 oldest_entry: OnceCell<ChronologicalOperationLogKey>,
53}
54
55impl OperationLog {
56 pub fn new(db: Database) -> Self {
57 Self {
58 db,
59 oldest_entry: OnceCell::new(),
60 }
61 }
62
63 async fn get_oldest_operation_log_key(&self) -> Option<ChronologicalOperationLogKey> {
67 let mut dbtx = self.db.begin_transaction_nc().await;
68 self.oldest_entry
69 .get_or_try_init(move || async move {
70 dbtx.find_by_prefix(&ChronologicalOperationLogKeyPrefix)
71 .await
72 .map(|(key, ())| key)
73 .next()
74 .await
75 .ok_or(())
76 })
77 .await
78 .ok()
79 .copied()
80 }
81
82 pub async fn add_operation_log_entry(
83 &self,
84 dbtx: &mut DatabaseTransaction<'_>,
85 operation_id: OperationId,
86 operation_type: &str,
87 operation_meta: impl serde::Serialize,
88 ) {
89 dbtx.insert_new_entry(
90 &OperationLogKey { operation_id },
91 &OperationLogEntry {
92 operation_module_kind: operation_type.to_string(),
93 meta: JsonStringed(
94 serde_json::to_value(operation_meta)
95 .expect("Can only fail if meta is not serializable"),
96 ),
97 outcome: None,
98 },
99 )
100 .await;
101 dbtx.insert_new_entry(
102 &ChronologicalOperationLogKey {
103 creation_time: now(),
104 operation_id,
105 },
106 &(),
107 )
108 .await;
109 }
110
111 #[deprecated(since = "0.6.0", note = "Use `paginate_operations_rev` instead")]
112 pub async fn list_operations(
113 &self,
114 limit: usize,
115 last_seen: Option<ChronologicalOperationLogKey>,
116 ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
117 self.paginate_operations_rev(limit, last_seen).await
118 }
119
120 pub async fn paginate_operations_rev(
123 &self,
124 limit: usize,
125 last_seen: Option<ChronologicalOperationLogKey>,
126 ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
127 const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7);
128
129 let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey {
130 creation_time: now() + Duration::from_secs(30),
134 operation_id: OperationId([0; 32]),
135 });
136
137 let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else {
138 return vec![];
139 };
140
141 let mut dbtx = self.db.begin_transaction_nc().await;
142 let mut operation_log_keys = Vec::with_capacity(limit);
143
144 'outer: for key_range_rev in
149 rev_epoch_ranges(start_after_key, oldest_entry_key, EPOCH_DURATION)
150 {
151 let epoch_operation_log_keys_rev = dbtx
152 .find_by_range(key_range_rev)
153 .await
154 .map(|(key, ())| key)
155 .collect::<Vec<_>>()
156 .await;
157
158 for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() {
159 operation_log_keys.push(operation_log_key);
160 if operation_log_keys.len() >= limit {
161 break 'outer;
162 }
163 }
164 }
165
166 debug_assert!(
167 operation_log_keys.iter().collect::<HashSet<_>>().len() == operation_log_keys.len(),
168 "Operation log keys returned are not unique"
169 );
170
171 let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len());
172 for operation_log_key in operation_log_keys {
173 let operation_log_entry = dbtx
174 .get_value(&OperationLogKey {
175 operation_id: operation_log_key.operation_id,
176 })
177 .await
178 .expect("Inconsistent DB");
179 operation_log_entries.push((operation_log_key, operation_log_entry));
180 }
181
182 operation_log_entries
183 }
184
185 pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
186 Self::get_operation_inner(
187 &mut self.db.begin_transaction_nc().await.into_nc(),
188 operation_id,
189 )
190 .await
191 }
192
193 async fn get_operation_inner(
194 dbtx: &mut DatabaseTransaction<'_>,
195 operation_id: OperationId,
196 ) -> Option<OperationLogEntry> {
197 dbtx.get_value(&OperationLogKey { operation_id }).await
198 }
199
200 #[instrument(target = LOG_CLIENT, skip(db), level = "debug")]
202 pub async fn set_operation_outcome(
203 db: &Database,
204 operation_id: OperationId,
205 outcome: &(impl Serialize + Debug),
206 ) -> anyhow::Result<()> {
207 let outcome_json =
208 JsonStringed(serde_json::to_value(outcome).expect("Outcome is not serializable"));
209
210 let mut dbtx = db.begin_transaction().await;
211 let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id)
212 .await
213 .expect("Operation exists");
214 operation.outcome = Some(OperationOutcome {
215 time: fedimint_core::time::now(),
216 outcome: outcome_json,
217 });
218 dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
219 .await;
220 dbtx.commit_tx_result().await?;
221
222 Ok(())
223 }
224
225 pub async fn optimistically_set_operation_outcome(
230 db: &Database,
231 operation_id: OperationId,
232 outcome: &(impl Serialize + Debug),
233 ) {
234 if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
235 warn!(
236 target: LOG_CLIENT,
237 "Error setting operation outcome: {e}"
238 );
239 }
240 }
241}
242
243fn rev_epoch_ranges(
253 start_after: ChronologicalOperationLogKey,
254 last_entry: ChronologicalOperationLogKey,
255 epoch_duration: Duration,
256) -> impl Iterator<Item = Range<ChronologicalOperationLogKey>> {
257 (0..)
262 .map(move |epoch| start_after.creation_time - epoch * epoch_duration)
263 .take_while(move |&start_time| start_time >= last_entry.creation_time)
267 .map(move |start_time| {
268 let end_time = start_time - epoch_duration;
269
270 let start_key = if start_time == start_after.creation_time {
274 start_after
275 } else {
276 ChronologicalOperationLogKey {
277 creation_time: start_time,
278 operation_id: OperationId([0; 32]),
279 }
280 };
281
282 let end_key = ChronologicalOperationLogKey {
287 creation_time: end_time,
288 operation_id: OperationId([0; 32]),
289 };
290
291 Range {
295 start: end_key,
296 end: start_key,
297 }
298 })
299}
300
301#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
303pub struct OperationLogEntryV0 {
304 pub(crate) operation_module_kind: String,
305 pub(crate) meta: JsonStringed,
306 pub(crate) outcome: Option<JsonStringed>,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
312pub(crate) struct OperationOutcome {
313 pub(crate) time: SystemTime,
314 pub(crate) outcome: JsonStringed,
315}
316
317#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
337pub struct OperationLogEntry {
338 pub(crate) operation_module_kind: String,
339 pub(crate) meta: JsonStringed,
340 pub(crate) outcome: Option<OperationOutcome>,
342}
343
344impl OperationLogEntry {
345 pub fn operation_module_kind(&self) -> &str {
347 &self.operation_module_kind
348 }
349
350 pub fn meta<M: DeserializeOwned>(&self) -> M {
356 serde_json::from_value(self.meta.0.clone()).expect("JSON deserialization should not fail")
357 }
358
359 pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
377 self.outcome.as_ref().map(|outcome| {
378 serde_json::from_value(outcome.outcome.0.clone())
379 .expect("JSON deserialization should not fail")
380 })
381 }
382
383 pub fn outcome_time(&self) -> Option<SystemTime> {
385 self.outcome.as_ref().map(|o| o.time)
386 }
387
388 pub fn outcome_or_updates<U, S>(
393 &self,
394 db: &Database,
395 operation_id: OperationId,
396 stream_gen: impl FnOnce() -> S,
397 ) -> UpdateStreamOrOutcome<U>
398 where
399 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
400 S: Stream<Item = U> + MaybeSend + 'static,
401 {
402 match self.outcome::<U>() {
403 Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
404 None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
405 db.clone(),
406 operation_id,
407 stream_gen(),
408 )),
409 }
410 }
411}
412
413pub enum UpdateStreamOrOutcome<U> {
416 UpdateStream(BoxStream<'static, U>),
417 Outcome(U),
418}
419
420impl<U> UpdateStreamOrOutcome<U>
421where
422 U: MaybeSend + MaybeSync + 'static,
423{
424 pub fn into_stream(self) -> BoxStream<'static, U> {
428 match self {
429 UpdateStreamOrOutcome::UpdateStream(stream) => stream,
430 UpdateStreamOrOutcome::Outcome(outcome) => {
431 Box::pin(stream::once(future::ready(outcome)))
432 }
433 }
434 }
435}
436
437pub fn caching_operation_update_stream<'a, U, S>(
440 db: Database,
441 operation_id: OperationId,
442 stream: S,
443) -> BoxStream<'a, U>
444where
445 U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
446 S: Stream<Item = U> + MaybeSend + 'a,
447{
448 let mut stream = Box::pin(stream);
449 Box::pin(stream! {
450 let mut last_update = None;
451 while let Some(update) = stream.next().await {
452 yield update.clone();
453 last_update = Some(update);
454 }
455
456 let Some(last_update) = last_update else {
457 error!(
458 target: LOG_CLIENT,
459 "Stream ended without any updates, this should not happen!"
460 );
461 return;
462 };
463
464 OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
465 })
466}
467
468#[cfg(test)]
469mod tests {
470 use std::time::{Duration, SystemTime};
471
472 use fedimint_core::core::OperationId;
473 use fedimint_core::db::mem_impl::MemDatabase;
474 use fedimint_core::db::{
475 Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, IRawDatabaseExt,
476 };
477 use fedimint_core::module::registry::ModuleRegistry;
478 use futures::stream::StreamExt;
479 use serde::{Deserialize, Serialize};
480
481 use super::UpdateStreamOrOutcome;
482 use crate::db::{ChronologicalOperationLogKey, OperationLogKey};
483 use crate::oplog::{JsonStringed, OperationLog, OperationLogEntry, OperationOutcome};
484
485 #[test]
486 fn test_operation_log_entry_serde() {
487 let op_log = OperationLogEntry {
489 operation_module_kind: "test".to_string(),
490 meta: JsonStringed(serde_json::to_value(()).unwrap()),
491 outcome: None,
492 };
493
494 op_log.meta::<()>();
495 }
496
497 #[test]
498 fn test_operation_log_entry_serde_extra_meta() {
499 #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
500 struct Meta {
501 foo: String,
502 extra_meta: serde_json::Value,
503 }
504
505 let meta = Meta {
506 foo: "bar".to_string(),
507 extra_meta: serde_json::to_value(()).unwrap(),
508 };
509
510 let op_log = OperationLogEntry {
511 operation_module_kind: "test".to_string(),
512 meta: JsonStringed(serde_json::to_value(meta.clone()).unwrap()),
513 outcome: Some(OperationOutcome {
514 time: fedimint_core::time::now(),
515 outcome: JsonStringed(serde_json::to_value("test_outcome").unwrap()),
516 }),
517 };
518
519 assert_eq!(op_log.meta::<Meta>(), meta);
520 }
521
522 #[tokio::test]
523 async fn test_operation_log_update() {
524 let op_id = OperationId([0x32; 32]);
525
526 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
527 let op_log = OperationLog::new(db.clone());
528
529 let mut dbtx = db.begin_transaction().await;
530 op_log
531 .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
532 .await;
533 dbtx.commit_tx().await;
534
535 let op = op_log.get_operation(op_id).await.expect("op exists");
536 assert_eq!(op.outcome, None);
537
538 OperationLog::set_operation_outcome(&db, op_id, &"baz")
539 .await
540 .unwrap();
541
542 let op = op_log.get_operation(op_id).await.expect("op exists");
543 assert_eq!(op.outcome::<String>(), Some("baz".to_string()));
544 assert!(op.outcome_time().is_some(), "outcome_time should be set");
545
546 let update_stream_or_outcome =
547 op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty);
548
549 assert!(matches!(
550 &update_stream_or_outcome,
551 UpdateStreamOrOutcome::Outcome(s) if s == "baz"
552 ));
553
554 let updates = update_stream_or_outcome
555 .into_stream()
556 .collect::<Vec<_>>()
557 .await;
558 assert_eq!(updates, vec!["baz"]);
559 }
560
561 #[tokio::test]
562 async fn test_operation_log_update_from_stream() {
563 let op_id = OperationId([0x32; 32]);
564
565 let db = MemDatabase::new().into_database();
566 let op_log = OperationLog::new(db.clone());
567
568 let mut dbtx = db.begin_transaction().await;
569 op_log
570 .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
571 .await;
572 dbtx.commit_tx().await;
573
574 let op = op_log.get_operation(op_id).await.expect("op exists");
575
576 let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()];
577 let update_stream = op
578 .outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone()));
579
580 let received_updates = update_stream.into_stream().collect::<Vec<_>>().await;
581 assert_eq!(received_updates, updates);
582
583 let op_updated = op_log.get_operation(op_id).await.expect("op exists");
584 assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string()));
585 assert!(
586 op_updated.outcome_time().is_some(),
587 "outcome_time should be set after stream completion"
588 );
589 }
590
591 #[tokio::test]
592 async fn test_pagination() {
593 fn assert_page_entries(
594 page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>,
595 page_idx: u8,
596 ) {
597 for (entry_idx, (_key, entry)) in page.into_iter().enumerate() {
598 let actual_meta = entry.meta::<u8>();
599 let expected_meta = 97 - (page_idx * 10 + entry_idx as u8);
600
601 assert_eq!(actual_meta, expected_meta);
602 }
603 }
604
605 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
606 let op_log = OperationLog::new(db.clone());
607
608 for operation_idx in 0u8..98 {
609 let mut dbtx = db.begin_transaction().await;
610 op_log
611 .add_operation_log_entry(
612 &mut dbtx.to_ref_nc(),
613 OperationId([operation_idx; 32]),
614 "foo",
615 operation_idx,
616 )
617 .await;
618 dbtx.commit_tx().await;
619 }
620
621 let mut previous_last_element = None;
622 for page_idx in 0u8..9 {
623 let page = op_log
624 .paginate_operations_rev(10, previous_last_element)
625 .await;
626 assert_eq!(page.len(), 10);
627 previous_last_element = Some(page[9].0);
628 assert_page_entries(page, page_idx);
629 }
630
631 let page = op_log
632 .paginate_operations_rev(10, previous_last_element)
633 .await;
634 assert_eq!(page.len(), 8);
635 assert_page_entries(page, 9);
636 }
637
638 #[tokio::test]
639 async fn test_pagination_empty() {
640 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
641 let op_log = OperationLog::new(db.clone());
642
643 let page = op_log.paginate_operations_rev(10, None).await;
644 assert!(page.is_empty());
645 }
646
647 #[tokio::test]
648 async fn test_pagination_multiple_operations_same_time() {
649 async fn insert_oplog(dbtx: &mut DatabaseTransaction<'_>, idx: u8, time: u64) {
650 let operation_id = OperationId([idx; 32]);
651 let creation_time = SystemTime::UNIX_EPOCH
653 + Duration::from_secs(60 * 60 * 24 * 365 * 40)
654 + Duration::from_secs(time * 60 * 60 * 24);
655
656 dbtx.insert_new_entry(
657 &OperationLogKey { operation_id },
658 &OperationLogEntry {
659 operation_module_kind: "operation_type".to_string(),
660 meta: JsonStringed(serde_json::Value::Null),
661 outcome: None,
662 },
663 )
664 .await;
665 dbtx.insert_new_entry(
666 &ChronologicalOperationLogKey {
667 creation_time,
668 operation_id,
669 },
670 &(),
671 )
672 .await;
673 }
674
675 async fn assert_pages(operation_log: &OperationLog, pages: Vec<Vec<u8>>) {
676 let mut previous_last_element: Option<ChronologicalOperationLogKey> = None;
677 for reference_page in pages {
678 let page = operation_log
679 .paginate_operations_rev(10, previous_last_element)
680 .await;
681 assert_eq!(page.len(), reference_page.len());
682 assert_eq!(
683 page.iter()
684 .map(|(operation_log_key, _)| operation_log_key.operation_id)
685 .collect::<Vec<_>>(),
686 reference_page
687 .iter()
688 .map(|&x| OperationId([x; 32]))
689 .collect::<Vec<_>>()
690 );
691 previous_last_element = page.last().map(|(key, _)| key).copied();
692 }
693 }
694
695 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
696 let op_log = OperationLog::new(db.clone());
697
698 let mut dbtx = db.begin_transaction().await;
699 for operation_idx in 0u8..10 {
700 insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 1).await;
701 }
702 dbtx.commit_tx().await;
703 assert_pages(&op_log, vec![vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0], vec![]]).await;
704
705 let mut dbtx = db.begin_transaction().await;
706 for operation_idx in 10u8..16 {
707 insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 2).await;
708 }
709 for operation_idx in 16u8..22 {
710 insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 3).await;
711 }
712 dbtx.commit_tx().await;
713 assert_pages(
714 &op_log,
715 vec![
716 vec![21, 20, 19, 18, 17, 16, 15, 14, 13, 12],
717 vec![11, 10, 9, 8, 7, 6, 5, 4, 3, 2],
718 vec![1, 0],
719 vec![],
720 ],
721 )
722 .await;
723
724 let mut dbtx = db.begin_transaction().await;
725 for operation_idx in 22u8..31 {
726 insert_oplog(
728 &mut dbtx.to_ref_nc(),
729 operation_idx,
730 10 * u64::from(operation_idx),
731 )
732 .await;
733 }
734 dbtx.commit_tx().await;
735 assert_pages(
736 &op_log,
737 vec![
738 vec![30, 29, 28, 27, 26, 25, 24, 23, 22, 21],
739 vec![20, 19, 18, 17, 16, 15, 14, 13, 12, 11],
740 vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
741 vec![0],
742 vec![],
743 ],
744 )
745 .await;
746 }
747
748 #[tokio::test]
749 async fn test_pagination_empty_then_not() {
750 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
751 let op_log = OperationLog::new(db.clone());
752
753 let page = op_log.paginate_operations_rev(10, None).await;
754 assert!(page.is_empty());
755
756 let mut dbtx = db.begin_transaction().await;
757 op_log
758 .add_operation_log_entry(&mut dbtx.to_ref_nc(), OperationId([0; 32]), "foo", "bar")
759 .await;
760 dbtx.commit_tx().await;
761
762 let page = op_log.paginate_operations_rev(10, None).await;
763 assert_eq!(page.len(), 1);
764 }
765}