1use std::fmt::Debug;
2use std::future;
3use std::io::{Read, Write};
4
5use async_stream::stream;
6use fedimint_core::core::OperationId;
7use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
9use fedimint_core::module::registry::ModuleDecoderRegistry;
10use fedimint_core::task::{MaybeSend, MaybeSync};
11use fedimint_core::time::now;
12use fedimint_core::util::BoxStream;
13use futures::{stream, Stream, StreamExt};
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use tracing::{error, instrument, warn};
17
18use crate::db::{
19 ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey,
20};
21
22#[derive(Debug, Clone)]
23pub struct OperationLog {
24 db: Database,
25}
26
27impl OperationLog {
28 pub fn new(db: Database) -> Self {
29 Self { db }
30 }
31
32 pub async fn add_operation_log_entry(
33 &self,
34 dbtx: &mut DatabaseTransaction<'_>,
35 operation_id: OperationId,
36 operation_type: &str,
37 operation_meta: impl serde::Serialize,
38 ) {
39 dbtx.insert_new_entry(
40 &OperationLogKey { operation_id },
41 &OperationLogEntry {
42 operation_module_kind: operation_type.to_string(),
43 meta: serde_json::to_value(operation_meta)
44 .expect("Can only fail if meta is not serializable"),
45 outcome: None,
46 },
47 )
48 .await;
49 dbtx.insert_new_entry(
50 &ChronologicalOperationLogKey {
51 creation_time: now(),
52 operation_id,
53 },
54 &(),
55 )
56 .await;
57 }
58
59 pub async fn list_operations(
62 &self,
63 limit: usize,
64 start_after: Option<ChronologicalOperationLogKey>,
65 ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
66 let mut dbtx = self.db.begin_transaction_nc().await;
67 let operations: Vec<ChronologicalOperationLogKey> = dbtx
68 .find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix)
69 .await
70 .map(|(key, ())| key)
71 .skip_while(|key| {
80 let skip = if let Some(start_after) = start_after {
81 key.creation_time >= start_after.creation_time
82 } else {
83 false
84 };
85
86 std::future::ready(skip)
87 })
88 .take(limit)
89 .collect::<Vec<_>>()
90 .await;
91
92 let mut operation_entries = Vec::with_capacity(operations.len());
93
94 for operation in operations {
95 let entry = dbtx
96 .get_value(&OperationLogKey {
97 operation_id: operation.operation_id,
98 })
99 .await
100 .expect("Inconsistent DB");
101 operation_entries.push((operation, entry));
102 }
103
104 operation_entries
105 }
106
107 pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
108 Self::get_operation_inner(
109 &mut self.db.begin_transaction_nc().await.into_nc(),
110 operation_id,
111 )
112 .await
113 }
114
115 async fn get_operation_inner(
116 dbtx: &mut DatabaseTransaction<'_>,
117 operation_id: OperationId,
118 ) -> Option<OperationLogEntry> {
119 dbtx.get_value(&OperationLogKey { operation_id }).await
120 }
121
122 #[instrument(skip(db), level = "debug")]
124 pub async fn set_operation_outcome(
125 db: &Database,
126 operation_id: OperationId,
127 outcome: &(impl Serialize + Debug),
128 ) -> anyhow::Result<()> {
129 let outcome_json = serde_json::to_value(outcome).expect("Outcome is not serializable");
130
131 let mut dbtx = db.begin_transaction().await;
132 let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id)
133 .await
134 .expect("Operation exists");
135 operation.outcome = Some(outcome_json);
136 dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
137 .await;
138 dbtx.commit_tx_result().await?;
139
140 Ok(())
141 }
142
143 pub async fn optimistically_set_operation_outcome(
148 db: &Database,
149 operation_id: OperationId,
150 outcome: &(impl Serialize + Debug),
151 ) {
152 if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
153 warn!("Error setting operation outcome: {e}");
154 }
155 }
156}
157
158#[derive(Debug, Serialize, Deserialize)]
178pub struct OperationLogEntry {
179 operation_module_kind: String,
180 meta: serde_json::Value,
181 pub(crate) outcome: Option<serde_json::Value>,
183}
184
185impl OperationLogEntry {
186 pub fn operation_module_kind(&self) -> &str {
188 &self.operation_module_kind
189 }
190
191 pub fn meta<M: DeserializeOwned>(&self) -> M {
197 serde_json::from_value(self.meta.clone()).expect("JSON deserialization should not fail")
198 }
199
200 pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
218 self.outcome.as_ref().map(|outcome| {
219 serde_json::from_value(outcome.clone()).expect("JSON deserialization should not fail")
220 })
221 }
222
223 pub fn outcome_or_updates<U, S>(
228 &self,
229 db: &Database,
230 operation_id: OperationId,
231 stream_gen: impl FnOnce() -> S,
232 ) -> UpdateStreamOrOutcome<U>
233 where
234 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
235 S: Stream<Item = U> + MaybeSend + 'static,
236 {
237 match self.outcome::<U>() {
238 Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
239 None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
240 db.clone(),
241 operation_id,
242 stream_gen(),
243 )),
244 }
245 }
246}
247
248impl Encodable for OperationLogEntry {
249 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
250 let mut len = 0;
251 len += self.operation_module_kind.consensus_encode(writer)?;
252 len += serde_json::to_string(&self.meta)
253 .expect("JSON serialization should not fail")
254 .consensus_encode(writer)?;
255 len += self
256 .outcome
257 .as_ref()
258 .map(|outcome| {
259 serde_json::to_string(outcome).expect("JSON serialization should not fail")
260 })
261 .consensus_encode(writer)?;
262
263 Ok(len)
264 }
265}
266
267impl Decodable for OperationLogEntry {
268 fn consensus_decode<R: Read>(
269 r: &mut R,
270 modules: &ModuleDecoderRegistry,
271 ) -> Result<Self, DecodeError> {
272 let operation_type = String::consensus_decode(r, modules)?;
273
274 let meta_str = String::consensus_decode(r, modules)?;
275 let meta = serde_json::from_str(&meta_str).map_err(DecodeError::from_err)?;
276
277 let outcome_str = Option::<String>::consensus_decode(r, modules)?;
278 let outcome = outcome_str
279 .map(|outcome_str| serde_json::from_str(&outcome_str).map_err(DecodeError::from_err))
280 .transpose()?;
281
282 Ok(OperationLogEntry {
283 operation_module_kind: operation_type,
284 meta,
285 outcome,
286 })
287 }
288}
289
290pub enum UpdateStreamOrOutcome<U> {
293 UpdateStream(BoxStream<'static, U>),
294 Outcome(U),
295}
296
297impl<U> UpdateStreamOrOutcome<U>
298where
299 U: MaybeSend + MaybeSync + 'static,
300{
301 pub fn into_stream(self) -> BoxStream<'static, U> {
305 match self {
306 UpdateStreamOrOutcome::UpdateStream(stream) => stream,
307 UpdateStreamOrOutcome::Outcome(outcome) => {
308 Box::pin(stream::once(future::ready(outcome)))
309 }
310 }
311 }
312}
313
314pub fn caching_operation_update_stream<'a, U, S>(
317 db: Database,
318 operation_id: OperationId,
319 stream: S,
320) -> BoxStream<'a, U>
321where
322 U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
323 S: Stream<Item = U> + MaybeSend + 'a,
324{
325 let mut stream = Box::pin(stream);
326 Box::pin(stream! {
327 let mut last_update = None;
328 while let Some(update) = stream.next().await {
329 yield update.clone();
330 last_update = Some(update);
331 }
332
333 let Some(last_update) = last_update else {
334 error!("Stream ended without any updates, this should not happen!");
335 return;
336 };
337
338 OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
339 })
340}
341
342#[cfg(test)]
343mod tests {
344 use fedimint_core::core::OperationId;
345 use fedimint_core::db::mem_impl::MemDatabase;
346 use fedimint_core::db::{Database, IRawDatabaseExt};
347 use fedimint_core::module::registry::ModuleRegistry;
348 use futures::stream::StreamExt;
349 use serde::{Deserialize, Serialize};
350
351 use super::UpdateStreamOrOutcome;
352 use crate::db::ChronologicalOperationLogKey;
353 use crate::oplog::{OperationLog, OperationLogEntry};
354
355 #[test]
356 fn test_operation_log_entry_serde() {
357 let op_log = OperationLogEntry {
358 operation_module_kind: "test".to_string(),
359 meta: serde_json::to_value(()).unwrap(),
360 outcome: None,
361 };
362
363 op_log.meta::<()>();
364 }
365
366 #[test]
367 fn test_operation_log_entry_serde_extra_meta() {
368 #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
369 struct Meta {
370 foo: String,
371 extra_meta: serde_json::Value,
372 }
373
374 let meta = Meta {
375 foo: "bar".to_string(),
376 extra_meta: serde_json::to_value(()).unwrap(),
377 };
378
379 let op_log = OperationLogEntry {
380 operation_module_kind: "test".to_string(),
381 meta: serde_json::to_value(meta.clone()).unwrap(),
382 outcome: None,
383 };
384
385 assert_eq!(op_log.meta::<Meta>(), meta);
386 }
387
388 #[tokio::test]
389 async fn test_operation_log_update() {
390 let op_id = OperationId([0x32; 32]);
391
392 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
393 let op_log = OperationLog::new(db.clone());
394
395 let mut dbtx = db.begin_transaction().await;
396 op_log
397 .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
398 .await;
399 dbtx.commit_tx().await;
400
401 let op = op_log.get_operation(op_id).await.expect("op exists");
402 assert_eq!(op.outcome, None);
403
404 OperationLog::set_operation_outcome(&db, op_id, &"baz")
405 .await
406 .unwrap();
407
408 let op = op_log.get_operation(op_id).await.expect("op exists");
409 assert_eq!(op.outcome::<String>(), Some("baz".to_string()));
410
411 let update_stream_or_outcome =
412 op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty);
413
414 assert!(matches!(
415 &update_stream_or_outcome,
416 UpdateStreamOrOutcome::Outcome(s) if s == "baz"
417 ));
418
419 let updates = update_stream_or_outcome
420 .into_stream()
421 .collect::<Vec<_>>()
422 .await;
423 assert_eq!(updates, vec!["baz"]);
424 }
425
426 #[tokio::test]
427 async fn test_operation_log_update_from_stream() {
428 let op_id = OperationId([0x32; 32]);
429
430 let db = MemDatabase::new().into_database();
431 let op_log = OperationLog::new(db.clone());
432
433 let mut dbtx = db.begin_transaction().await;
434 op_log
435 .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
436 .await;
437 dbtx.commit_tx().await;
438
439 let op = op_log.get_operation(op_id).await.expect("op exists");
440
441 let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()];
442 let update_stream = op
443 .outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone()));
444
445 let received_updates = update_stream.into_stream().collect::<Vec<_>>().await;
446 assert_eq!(received_updates, updates);
447
448 let op_updated = op_log.get_operation(op_id).await.expect("op exists");
449 assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string()));
450 }
451
452 #[tokio::test]
453 async fn test_pagination() {
454 fn assert_page_entries(
455 page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>,
456 page_idx: u8,
457 ) {
458 for (entry_idx, (_key, entry)) in page.into_iter().enumerate() {
459 let actual_meta = entry.meta::<u8>();
460 let expected_meta = 97 - (page_idx * 10 + entry_idx as u8);
461
462 assert_eq!(actual_meta, expected_meta);
463 }
464 }
465
466 let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
467 let op_log = OperationLog::new(db.clone());
468
469 for operation_idx in 0u8..98 {
470 let mut dbtx = db.begin_transaction().await;
471 op_log
472 .add_operation_log_entry(
473 &mut dbtx.to_ref_nc(),
474 OperationId([operation_idx; 32]),
475 "foo",
476 operation_idx,
477 )
478 .await;
479 dbtx.commit_tx().await;
480 }
481
482 let mut previous_last_element = None;
483 for page_idx in 0u8..9 {
484 let page = op_log.list_operations(10, previous_last_element).await;
485 assert_eq!(page.len(), 10);
486 previous_last_element = Some(page[9].0);
487 assert_page_entries(page, page_idx);
488 }
489
490 let page = op_log.list_operations(10, previous_last_element).await;
491 assert_eq!(page.len(), 8);
492 assert_page_entries(page, 9);
493 }
494}