fedimint_client/
oplog.rs

1use std::fmt::Debug;
2use std::future;
3use std::io::{Read, Write};
4
5use async_stream::stream;
6use fedimint_core::core::OperationId;
7use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
9use fedimint_core::module::registry::ModuleDecoderRegistry;
10use fedimint_core::task::{MaybeSend, MaybeSync};
11use fedimint_core::time::now;
12use fedimint_core::util::BoxStream;
13use futures::{stream, Stream, StreamExt};
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use tracing::{error, instrument, warn};
17
18use crate::db::{
19    ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey,
20};
21
22#[derive(Debug, Clone)]
23pub struct OperationLog {
24    db: Database,
25}
26
27impl OperationLog {
28    pub fn new(db: Database) -> Self {
29        Self { db }
30    }
31
32    pub async fn add_operation_log_entry(
33        &self,
34        dbtx: &mut DatabaseTransaction<'_>,
35        operation_id: OperationId,
36        operation_type: &str,
37        operation_meta: impl serde::Serialize,
38    ) {
39        dbtx.insert_new_entry(
40            &OperationLogKey { operation_id },
41            &OperationLogEntry {
42                operation_module_kind: operation_type.to_string(),
43                meta: serde_json::to_value(operation_meta)
44                    .expect("Can only fail if meta is not serializable"),
45                outcome: None,
46            },
47        )
48        .await;
49        dbtx.insert_new_entry(
50            &ChronologicalOperationLogKey {
51                creation_time: now(),
52                operation_id,
53            },
54            &(),
55        )
56        .await;
57    }
58
59    /// Returns the last `limit` operations. To fetch the next page, pass the
60    /// last operation's [`ChronologicalOperationLogKey`] as `start_after`.
61    pub async fn list_operations(
62        &self,
63        limit: usize,
64        start_after: Option<ChronologicalOperationLogKey>,
65    ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
66        let mut dbtx = self.db.begin_transaction_nc().await;
67        let operations: Vec<ChronologicalOperationLogKey> = dbtx
68            .find_by_prefix_sorted_descending(&ChronologicalOperationLogKeyPrefix)
69            .await
70            .map(|(key, ())| key)
71            // FIXME: this is a schlemil-the-painter algorithm that will take longer the further
72            // back in history one goes. To avoid that I see two options:
73            //   1. Add a reference to the previous operation to each operation log entry,
74            //      essentially creating a linked list, which seem a little bit inelegant.
75            //   2. Add an option to prefix queries that allows to specify a start key
76            //
77            // The current implementation may also skip operations due to `SystemTime` not being
78            // guaranteed to be monotonous. The linked list approach would also fix that.
79            .skip_while(|key| {
80                let skip = if let Some(start_after) = start_after {
81                    key.creation_time >= start_after.creation_time
82                } else {
83                    false
84                };
85
86                std::future::ready(skip)
87            })
88            .take(limit)
89            .collect::<Vec<_>>()
90            .await;
91
92        let mut operation_entries = Vec::with_capacity(operations.len());
93
94        for operation in operations {
95            let entry = dbtx
96                .get_value(&OperationLogKey {
97                    operation_id: operation.operation_id,
98                })
99                .await
100                .expect("Inconsistent DB");
101            operation_entries.push((operation, entry));
102        }
103
104        operation_entries
105    }
106
107    pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
108        Self::get_operation_inner(
109            &mut self.db.begin_transaction_nc().await.into_nc(),
110            operation_id,
111        )
112        .await
113    }
114
115    async fn get_operation_inner(
116        dbtx: &mut DatabaseTransaction<'_>,
117        operation_id: OperationId,
118    ) -> Option<OperationLogEntry> {
119        dbtx.get_value(&OperationLogKey { operation_id }).await
120    }
121
122    /// Sets the outcome of an operation
123    #[instrument(skip(db), level = "debug")]
124    pub async fn set_operation_outcome(
125        db: &Database,
126        operation_id: OperationId,
127        outcome: &(impl Serialize + Debug),
128    ) -> anyhow::Result<()> {
129        let outcome_json = serde_json::to_value(outcome).expect("Outcome is not serializable");
130
131        let mut dbtx = db.begin_transaction().await;
132        let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id)
133            .await
134            .expect("Operation exists");
135        operation.outcome = Some(outcome_json);
136        dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
137            .await;
138        dbtx.commit_tx_result().await?;
139
140        Ok(())
141    }
142
143    /// Tries to set the outcome of an operation, but only logs an error if it
144    /// fails and does not return it. Since the outcome can always be recomputed
145    /// from an update stream, failing to save it isn't a problem in cases where
146    /// we do this merely for caching.
147    pub async fn optimistically_set_operation_outcome(
148        db: &Database,
149        operation_id: OperationId,
150        outcome: &(impl Serialize + Debug),
151    ) {
152        if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
153            warn!("Error setting operation outcome: {e}");
154        }
155    }
156}
157
158/// Represents an operation triggered by a user, typically related to sending or
159/// receiving money.
160///
161/// There are three levels of introspection possible for `OperationLogEntry`s:
162///   1. The [`OperationLogEntry::operation_module_kind`] function returns the
163///      kind of the module that created the operation.
164///   2. The [`OperationLogEntry::meta`] function returns static meta data that
165///      was associated with the operation when it was created. Modules define
166///      their own meta structures, so the module kind has to be used to
167///      determine the structure of the meta data.
168///   3. To find out the current state of the operation there is a two-step
169///      process:
170///      * First, the [`OperationLogEntry::outcome`] function returns the
171///        outcome if the operation finished **and** the update subscription
172///        stream has been processed till its end at least once.
173///      * If that isn't the case, the [`OperationLogEntry::outcome`] method
174///        will return `None` and the appropriate update subscription function
175///        has to be called. See the respective client extension trait for these
176///        functions.
177#[derive(Debug, Serialize, Deserialize)]
178pub struct OperationLogEntry {
179    operation_module_kind: String,
180    meta: serde_json::Value,
181    // TODO: probably change all that JSON to Dyn-types
182    pub(crate) outcome: Option<serde_json::Value>,
183}
184
185impl OperationLogEntry {
186    /// Returns the kind of the module that generated the operation
187    pub fn operation_module_kind(&self) -> &str {
188        &self.operation_module_kind
189    }
190
191    /// Returns the meta data of the operation. This is a JSON value that can be
192    /// either returned as a [`serde_json::Value`] or deserialized into a
193    /// specific type. The specific type should be named `<Module>OperationMeta`
194    /// in the module's client crate. The module can be determined by calling
195    /// [`OperationLogEntry::operation_module_kind`].
196    pub fn meta<M: DeserializeOwned>(&self) -> M {
197        serde_json::from_value(self.meta.clone()).expect("JSON deserialization should not fail")
198    }
199
200    /// Returns the last state update of the operation, if any was cached yet.
201    /// If this hasn't been the case yet and `None` is returned subscribe to the
202    /// appropriate update stream.
203    ///
204    /// ## Determining the return type
205    /// [`OperationLogEntry::meta`] should tell you the which operation type of
206    /// a given module the outcome belongs to. The operation type will have a
207    /// corresponding `async fn subscribe_type(&self, operation_id:
208    /// OperationId) -> anyhow::Result<UpdateStreamOrOutcome<TypeState>>;`
209    /// function that returns a `UpdateStreamOrOutcome<S>` where `S` is the
210    /// high-level state the operation is in. If this state is terminal, i.e.
211    /// the stream closes after returning it, it will be cached as the `outcome`
212    /// of the operation.
213    ///
214    /// This means the type to be used for deserializing the outcome is `S`,
215    /// often called `<OperationType>State`. Alternatively one can also use
216    /// [`serde_json::Value`] to get the unstructured data.
217    pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
218        self.outcome.as_ref().map(|outcome| {
219            serde_json::from_value(outcome.clone()).expect("JSON deserialization should not fail")
220        })
221    }
222
223    /// Returns an a [`UpdateStreamOrOutcome`] enum that can be converted into
224    /// an update stream for easier handling using
225    /// [`UpdateStreamOrOutcome::into_stream`] but can also be matched over to
226    /// shortcut the handling of final outcomes.
227    pub fn outcome_or_updates<U, S>(
228        &self,
229        db: &Database,
230        operation_id: OperationId,
231        stream_gen: impl FnOnce() -> S,
232    ) -> UpdateStreamOrOutcome<U>
233    where
234        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
235        S: Stream<Item = U> + MaybeSend + 'static,
236    {
237        match self.outcome::<U>() {
238            Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
239            None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
240                db.clone(),
241                operation_id,
242                stream_gen(),
243            )),
244        }
245    }
246}
247
248impl Encodable for OperationLogEntry {
249    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
250        let mut len = 0;
251        len += self.operation_module_kind.consensus_encode(writer)?;
252        len += serde_json::to_string(&self.meta)
253            .expect("JSON serialization should not fail")
254            .consensus_encode(writer)?;
255        len += self
256            .outcome
257            .as_ref()
258            .map(|outcome| {
259                serde_json::to_string(outcome).expect("JSON serialization should not fail")
260            })
261            .consensus_encode(writer)?;
262
263        Ok(len)
264    }
265}
266
267impl Decodable for OperationLogEntry {
268    fn consensus_decode<R: Read>(
269        r: &mut R,
270        modules: &ModuleDecoderRegistry,
271    ) -> Result<Self, DecodeError> {
272        let operation_type = String::consensus_decode(r, modules)?;
273
274        let meta_str = String::consensus_decode(r, modules)?;
275        let meta = serde_json::from_str(&meta_str).map_err(DecodeError::from_err)?;
276
277        let outcome_str = Option::<String>::consensus_decode(r, modules)?;
278        let outcome = outcome_str
279            .map(|outcome_str| serde_json::from_str(&outcome_str).map_err(DecodeError::from_err))
280            .transpose()?;
281
282        Ok(OperationLogEntry {
283            operation_module_kind: operation_type,
284            meta,
285            outcome,
286        })
287    }
288}
289
290/// Either a stream of operation updates if the operation hasn't finished yet or
291/// its outcome otherwise.
292pub enum UpdateStreamOrOutcome<U> {
293    UpdateStream(BoxStream<'static, U>),
294    Outcome(U),
295}
296
297impl<U> UpdateStreamOrOutcome<U>
298where
299    U: MaybeSend + MaybeSync + 'static,
300{
301    /// Returns a stream no matter if the operation is finished. If there
302    /// already is a cached outcome the stream will only return that, otherwise
303    /// all updates will be returned until the operation finishes.
304    pub fn into_stream(self) -> BoxStream<'static, U> {
305        match self {
306            UpdateStreamOrOutcome::UpdateStream(stream) => stream,
307            UpdateStreamOrOutcome::Outcome(outcome) => {
308                Box::pin(stream::once(future::ready(outcome)))
309            }
310        }
311    }
312}
313
314/// Wraps an operation update stream such that the last update before it closes
315/// is tried to be written to the operation log entry as its outcome.
316pub fn caching_operation_update_stream<'a, U, S>(
317    db: Database,
318    operation_id: OperationId,
319    stream: S,
320) -> BoxStream<'a, U>
321where
322    U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
323    S: Stream<Item = U> + MaybeSend + 'a,
324{
325    let mut stream = Box::pin(stream);
326    Box::pin(stream! {
327        let mut last_update = None;
328        while let Some(update) = stream.next().await {
329            yield update.clone();
330            last_update = Some(update);
331        }
332
333        let Some(last_update) = last_update else {
334            error!("Stream ended without any updates, this should not happen!");
335            return;
336        };
337
338        OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
339    })
340}
341
342#[cfg(test)]
343mod tests {
344    use fedimint_core::core::OperationId;
345    use fedimint_core::db::mem_impl::MemDatabase;
346    use fedimint_core::db::{Database, IRawDatabaseExt};
347    use fedimint_core::module::registry::ModuleRegistry;
348    use futures::stream::StreamExt;
349    use serde::{Deserialize, Serialize};
350
351    use super::UpdateStreamOrOutcome;
352    use crate::db::ChronologicalOperationLogKey;
353    use crate::oplog::{OperationLog, OperationLogEntry};
354
355    #[test]
356    fn test_operation_log_entry_serde() {
357        let op_log = OperationLogEntry {
358            operation_module_kind: "test".to_string(),
359            meta: serde_json::to_value(()).unwrap(),
360            outcome: None,
361        };
362
363        op_log.meta::<()>();
364    }
365
366    #[test]
367    fn test_operation_log_entry_serde_extra_meta() {
368        #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
369        struct Meta {
370            foo: String,
371            extra_meta: serde_json::Value,
372        }
373
374        let meta = Meta {
375            foo: "bar".to_string(),
376            extra_meta: serde_json::to_value(()).unwrap(),
377        };
378
379        let op_log = OperationLogEntry {
380            operation_module_kind: "test".to_string(),
381            meta: serde_json::to_value(meta.clone()).unwrap(),
382            outcome: None,
383        };
384
385        assert_eq!(op_log.meta::<Meta>(), meta);
386    }
387
388    #[tokio::test]
389    async fn test_operation_log_update() {
390        let op_id = OperationId([0x32; 32]);
391
392        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
393        let op_log = OperationLog::new(db.clone());
394
395        let mut dbtx = db.begin_transaction().await;
396        op_log
397            .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
398            .await;
399        dbtx.commit_tx().await;
400
401        let op = op_log.get_operation(op_id).await.expect("op exists");
402        assert_eq!(op.outcome, None);
403
404        OperationLog::set_operation_outcome(&db, op_id, &"baz")
405            .await
406            .unwrap();
407
408        let op = op_log.get_operation(op_id).await.expect("op exists");
409        assert_eq!(op.outcome::<String>(), Some("baz".to_string()));
410
411        let update_stream_or_outcome =
412            op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty);
413
414        assert!(matches!(
415            &update_stream_or_outcome,
416            UpdateStreamOrOutcome::Outcome(s) if s == "baz"
417        ));
418
419        let updates = update_stream_or_outcome
420            .into_stream()
421            .collect::<Vec<_>>()
422            .await;
423        assert_eq!(updates, vec!["baz"]);
424    }
425
426    #[tokio::test]
427    async fn test_operation_log_update_from_stream() {
428        let op_id = OperationId([0x32; 32]);
429
430        let db = MemDatabase::new().into_database();
431        let op_log = OperationLog::new(db.clone());
432
433        let mut dbtx = db.begin_transaction().await;
434        op_log
435            .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
436            .await;
437        dbtx.commit_tx().await;
438
439        let op = op_log.get_operation(op_id).await.expect("op exists");
440
441        let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()];
442        let update_stream = op
443            .outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone()));
444
445        let received_updates = update_stream.into_stream().collect::<Vec<_>>().await;
446        assert_eq!(received_updates, updates);
447
448        let op_updated = op_log.get_operation(op_id).await.expect("op exists");
449        assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string()));
450    }
451
452    #[tokio::test]
453    async fn test_pagination() {
454        fn assert_page_entries(
455            page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>,
456            page_idx: u8,
457        ) {
458            for (entry_idx, (_key, entry)) in page.into_iter().enumerate() {
459                let actual_meta = entry.meta::<u8>();
460                let expected_meta = 97 - (page_idx * 10 + entry_idx as u8);
461
462                assert_eq!(actual_meta, expected_meta);
463            }
464        }
465
466        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
467        let op_log = OperationLog::new(db.clone());
468
469        for operation_idx in 0u8..98 {
470            let mut dbtx = db.begin_transaction().await;
471            op_log
472                .add_operation_log_entry(
473                    &mut dbtx.to_ref_nc(),
474                    OperationId([operation_idx; 32]),
475                    "foo",
476                    operation_idx,
477                )
478                .await;
479            dbtx.commit_tx().await;
480        }
481
482        let mut previous_last_element = None;
483        for page_idx in 0u8..9 {
484            let page = op_log.list_operations(10, previous_last_element).await;
485            assert_eq!(page.len(), 10);
486            previous_last_element = Some(page[9].0);
487            assert_page_entries(page, page_idx);
488        }
489
490        let page = op_log.list_operations(10, previous_last_element).await;
491        assert_eq!(page.len(), 8);
492        assert_page_entries(page, 9);
493    }
494}