fedimint_client/
oplog.rs

1use std::collections::HashSet;
2use std::fmt::Debug;
3use std::future;
4use std::ops::Range;
5use std::time::{Duration, SystemTime};
6
7use async_stream::stream;
8use fedimint_core::core::OperationId;
9use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
10use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
11use fedimint_core::module::registry::ModuleDecoderRegistry;
12use fedimint_core::task::{MaybeSend, MaybeSync};
13use fedimint_core::time::now;
14use fedimint_core::util::BoxStream;
15use fedimint_logging::LOG_CLIENT;
16use futures::{stream, Stream, StreamExt};
17use serde::de::DeserializeOwned;
18use serde::{Deserialize, Serialize};
19use tokio::sync::OnceCell;
20use tracing::{error, instrument, warn};
21
22use crate::db::{
23    ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey,
24};
25
26/// Json value using string representation as db encoding.
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(transparent)]
29pub struct JsonStringed(pub serde_json::Value);
30
31impl Encodable for JsonStringed {
32    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
33        let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
34        json_str.consensus_encode(writer)
35    }
36}
37
38impl Decodable for JsonStringed {
39    fn consensus_decode_partial<R: std::io::Read>(
40        r: &mut R,
41        modules: &ModuleDecoderRegistry,
42    ) -> Result<Self, DecodeError> {
43        let json_str = String::consensus_decode_partial(r, modules)?;
44        let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
45        Ok(JsonStringed(value))
46    }
47}
48
49#[derive(Debug, Clone)]
50pub struct OperationLog {
51    db: Database,
52    oldest_entry: OnceCell<ChronologicalOperationLogKey>,
53}
54
55impl OperationLog {
56    pub fn new(db: Database) -> Self {
57        Self {
58            db,
59            oldest_entry: OnceCell::new(),
60        }
61    }
62
63    /// Will return the oldest operation log key in the database and cache the
64    /// result. If no entry exists yet the DB will be queried on each call till
65    /// an entry is present.
66    async fn get_oldest_operation_log_key(&self) -> Option<ChronologicalOperationLogKey> {
67        let mut dbtx = self.db.begin_transaction_nc().await;
68        self.oldest_entry
69            .get_or_try_init(move || async move {
70                dbtx.find_by_prefix(&ChronologicalOperationLogKeyPrefix)
71                    .await
72                    .map(|(key, ())| key)
73                    .next()
74                    .await
75                    .ok_or(())
76            })
77            .await
78            .ok()
79            .copied()
80    }
81
82    pub async fn add_operation_log_entry(
83        &self,
84        dbtx: &mut DatabaseTransaction<'_>,
85        operation_id: OperationId,
86        operation_type: &str,
87        operation_meta: impl serde::Serialize,
88    ) {
89        dbtx.insert_new_entry(
90            &OperationLogKey { operation_id },
91            &OperationLogEntry {
92                operation_module_kind: operation_type.to_string(),
93                meta: JsonStringed(
94                    serde_json::to_value(operation_meta)
95                        .expect("Can only fail if meta is not serializable"),
96                ),
97                outcome: None,
98            },
99        )
100        .await;
101        dbtx.insert_new_entry(
102            &ChronologicalOperationLogKey {
103                creation_time: now(),
104                operation_id,
105            },
106            &(),
107        )
108        .await;
109    }
110
111    #[deprecated(since = "0.6.0", note = "Use `paginate_operations_rev` instead")]
112    pub async fn list_operations(
113        &self,
114        limit: usize,
115        last_seen: Option<ChronologicalOperationLogKey>,
116    ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
117        self.paginate_operations_rev(limit, last_seen).await
118    }
119
120    /// Returns the last `limit` operations. To fetch the next page, pass the
121    /// last operation's [`ChronologicalOperationLogKey`] as `start_after`.
122    pub async fn paginate_operations_rev(
123        &self,
124        limit: usize,
125        last_seen: Option<ChronologicalOperationLogKey>,
126    ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
127        const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7);
128
129        let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey {
130            // We don't expect any operations from the future to exist, since SystemTime isn't
131            // monotone and CI can be overloaded at times we add a small buffer to avoid flakiness
132            // in tests.
133            creation_time: now() + Duration::from_secs(30),
134            operation_id: OperationId([0; 32]),
135        });
136
137        let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else {
138            return vec![];
139        };
140
141        let mut dbtx = self.db.begin_transaction_nc().await;
142        let mut operation_log_keys = Vec::with_capacity(limit);
143
144        // Find all the operation log keys in the requested window. Since we decided to
145        // not introduce a find_by_range_rev function we have to jump through some
146        // hoops, see also the comments in rev_epoch_ranges.
147        // TODO: Implement using find_by_range_rev if ever introduced
148        'outer: for key_range_rev in
149            rev_epoch_ranges(start_after_key, oldest_entry_key, EPOCH_DURATION)
150        {
151            let epoch_operation_log_keys_rev = dbtx
152                .find_by_range(key_range_rev)
153                .await
154                .map(|(key, ())| key)
155                .collect::<Vec<_>>()
156                .await;
157
158            for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() {
159                operation_log_keys.push(operation_log_key);
160                if operation_log_keys.len() >= limit {
161                    break 'outer;
162                }
163            }
164        }
165
166        debug_assert!(
167            operation_log_keys.iter().collect::<HashSet<_>>().len() == operation_log_keys.len(),
168            "Operation log keys returned are not unique"
169        );
170
171        let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len());
172        for operation_log_key in operation_log_keys {
173            let operation_log_entry = dbtx
174                .get_value(&OperationLogKey {
175                    operation_id: operation_log_key.operation_id,
176                })
177                .await
178                .expect("Inconsistent DB");
179            operation_log_entries.push((operation_log_key, operation_log_entry));
180        }
181
182        operation_log_entries
183    }
184
185    pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
186        Self::get_operation_inner(
187            &mut self.db.begin_transaction_nc().await.into_nc(),
188            operation_id,
189        )
190        .await
191    }
192
193    async fn get_operation_inner(
194        dbtx: &mut DatabaseTransaction<'_>,
195        operation_id: OperationId,
196    ) -> Option<OperationLogEntry> {
197        dbtx.get_value(&OperationLogKey { operation_id }).await
198    }
199
200    /// Sets the outcome of an operation
201    #[instrument(target = LOG_CLIENT, skip(db), level = "debug")]
202    pub async fn set_operation_outcome(
203        db: &Database,
204        operation_id: OperationId,
205        outcome: &(impl Serialize + Debug),
206    ) -> anyhow::Result<()> {
207        let outcome_json =
208            JsonStringed(serde_json::to_value(outcome).expect("Outcome is not serializable"));
209
210        let mut dbtx = db.begin_transaction().await;
211        let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id)
212            .await
213            .expect("Operation exists");
214        operation.outcome = Some(OperationOutcome {
215            time: fedimint_core::time::now(),
216            outcome: outcome_json,
217        });
218        dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
219            .await;
220        dbtx.commit_tx_result().await?;
221
222        Ok(())
223    }
224
225    /// Tries to set the outcome of an operation, but only logs an error if it
226    /// fails and does not return it. Since the outcome can always be recomputed
227    /// from an update stream, failing to save it isn't a problem in cases where
228    /// we do this merely for caching.
229    pub async fn optimistically_set_operation_outcome(
230        db: &Database,
231        operation_id: OperationId,
232        outcome: &(impl Serialize + Debug),
233    ) {
234        if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
235            warn!(
236                target: LOG_CLIENT,
237                "Error setting operation outcome: {e}"
238            );
239        }
240    }
241}
242
243/// Returns an iterator over the ranges of operation log keys, starting from the
244/// most recent range and going backwards in time till slightly later than
245/// `last_entry`.
246///
247/// Simplifying keys to integers and assuming a `start_after` of 100, a
248/// `last_entry` of 55 and an `epoch_duration` of 10 the ranges would be:
249/// ```text
250/// [90..100, 80..90, 70..80, 60..70, 50..60]
251/// ```
252fn rev_epoch_ranges(
253    start_after: ChronologicalOperationLogKey,
254    last_entry: ChronologicalOperationLogKey,
255    epoch_duration: Duration,
256) -> impl Iterator<Item = Range<ChronologicalOperationLogKey>> {
257    // We want to fetch all operations that were created before `start_after`, going
258    // backwards in time. This means "start" generally means a later time than
259    // "end". Only when creating a rust Range we have to swap the terminology (see
260    // comment there).
261    (0..)
262        .map(move |epoch| start_after.creation_time - epoch * epoch_duration)
263        // We want to get all operation log keys in the range [last_key, start_after). So as
264        // long as the start time is greater than the last key's creation time, we have to
265        // keep going.
266        .take_while(move |&start_time| start_time >= last_entry.creation_time)
267        .map(move |start_time| {
268            let end_time = start_time - epoch_duration;
269
270            // In the edge case that there were two events logged at exactly the same time
271            // we need to specify the correct operation_id for the first key. Otherwise, we
272            // could miss entries.
273            let start_key = if start_time == start_after.creation_time {
274                start_after
275            } else {
276                ChronologicalOperationLogKey {
277                    creation_time: start_time,
278                    operation_id: OperationId([0; 32]),
279                }
280            };
281
282            // We could also special-case the last key here, but it's not necessary, making
283            // it last_key if end_time < last_key.creation_time. We know there are no
284            // entries beyond last_key though, so the range query will be equivalent either
285            // way.
286            let end_key = ChronologicalOperationLogKey {
287                creation_time: end_time,
288                operation_id: OperationId([0; 32]),
289            };
290
291            // We want to go backwards using a forward range query. This means we have to
292            // swap the start and end keys and then reverse the vector returned by the
293            // query.
294            Range {
295                start: end_key,
296                end: start_key,
297            }
298        })
299}
300
301/// V0 version of operation log entry for migration purposes
302#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
303pub struct OperationLogEntryV0 {
304    pub(crate) operation_module_kind: String,
305    pub(crate) meta: JsonStringed,
306    pub(crate) outcome: Option<JsonStringed>,
307}
308
309/// Represents the outcome of an operation, combining both the outcome value and
310/// its timestamp
311#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
312pub(crate) struct OperationOutcome {
313    pub(crate) time: SystemTime,
314    pub(crate) outcome: JsonStringed,
315}
316
317/// Represents an operation triggered by a user, typically related to sending or
318/// receiving money.
319///
320/// There are three levels of introspection possible for `OperationLogEntry`s:
321///   1. The [`OperationLogEntry::operation_module_kind`] function returns the
322///      kind of the module that created the operation.
323///   2. The [`OperationLogEntry::meta`] function returns static meta data that
324///      was associated with the operation when it was created. Modules define
325///      their own meta structures, so the module kind has to be used to
326///      determine the structure of the meta data.
327///   3. To find out the current state of the operation there is a two-step
328///      process:
329///      * First, the [`OperationLogEntry::outcome`] function returns the
330///        outcome if the operation finished **and** the update subscription
331///        stream has been processed till its end at least once.
332///      * If that isn't the case, the [`OperationLogEntry::outcome`] method
333///        will return `None` and the appropriate update subscription function
334///        has to be called. See the respective client extension trait for these
335///        functions.
336#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
337pub struct OperationLogEntry {
338    pub(crate) operation_module_kind: String,
339    pub(crate) meta: JsonStringed,
340    // TODO: probably change all that JSON to Dyn-types
341    pub(crate) outcome: Option<OperationOutcome>,
342}
343
344impl OperationLogEntry {
345    /// Returns the kind of the module that generated the operation
346    pub fn operation_module_kind(&self) -> &str {
347        &self.operation_module_kind
348    }
349
350    /// Returns the meta data of the operation. This is a JSON value that can be
351    /// either returned as a [`serde_json::Value`] or deserialized into a
352    /// specific type. The specific type should be named `<Module>OperationMeta`
353    /// in the module's client crate. The module can be determined by calling
354    /// [`OperationLogEntry::operation_module_kind`].
355    pub fn meta<M: DeserializeOwned>(&self) -> M {
356        serde_json::from_value(self.meta.0.clone()).expect("JSON deserialization should not fail")
357    }
358
359    /// Returns the last state update of the operation, if any was cached yet.
360    /// If this hasn't been the case yet and `None` is returned subscribe to the
361    /// appropriate update stream.
362    ///
363    /// ## Determining the return type
364    /// [`OperationLogEntry::meta`] should tell you the which operation type of
365    /// a given module the outcome belongs to. The operation type will have a
366    /// corresponding `async fn subscribe_type(&self, operation_id:
367    /// OperationId) -> anyhow::Result<UpdateStreamOrOutcome<TypeState>>;`
368    /// function that returns a `UpdateStreamOrOutcome<S>` where `S` is the
369    /// high-level state the operation is in. If this state is terminal, i.e.
370    /// the stream closes after returning it, it will be cached as the `outcome`
371    /// of the operation.
372    ///
373    /// This means the type to be used for deserializing the outcome is `S`,
374    /// often called `<OperationType>State`. Alternatively one can also use
375    /// [`serde_json::Value`] to get the unstructured data.
376    pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
377        self.outcome.as_ref().map(|outcome| {
378            serde_json::from_value(outcome.outcome.0.clone())
379                .expect("JSON deserialization should not fail")
380        })
381    }
382
383    /// Returns the time when the outcome was cached.
384    pub fn outcome_time(&self) -> Option<SystemTime> {
385        self.outcome.as_ref().map(|o| o.time)
386    }
387
388    /// Returns an a [`UpdateStreamOrOutcome`] enum that can be converted into
389    /// an update stream for easier handling using
390    /// [`UpdateStreamOrOutcome::into_stream`] but can also be matched over to
391    /// shortcut the handling of final outcomes.
392    pub fn outcome_or_updates<U, S>(
393        &self,
394        db: &Database,
395        operation_id: OperationId,
396        stream_gen: impl FnOnce() -> S,
397    ) -> UpdateStreamOrOutcome<U>
398    where
399        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
400        S: Stream<Item = U> + MaybeSend + 'static,
401    {
402        match self.outcome::<U>() {
403            Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
404            None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
405                db.clone(),
406                operation_id,
407                stream_gen(),
408            )),
409        }
410    }
411}
412
413/// Either a stream of operation updates if the operation hasn't finished yet or
414/// its outcome otherwise.
415pub enum UpdateStreamOrOutcome<U> {
416    UpdateStream(BoxStream<'static, U>),
417    Outcome(U),
418}
419
420impl<U> UpdateStreamOrOutcome<U>
421where
422    U: MaybeSend + MaybeSync + 'static,
423{
424    /// Returns a stream no matter if the operation is finished. If there
425    /// already is a cached outcome the stream will only return that, otherwise
426    /// all updates will be returned until the operation finishes.
427    pub fn into_stream(self) -> BoxStream<'static, U> {
428        match self {
429            UpdateStreamOrOutcome::UpdateStream(stream) => stream,
430            UpdateStreamOrOutcome::Outcome(outcome) => {
431                Box::pin(stream::once(future::ready(outcome)))
432            }
433        }
434    }
435}
436
437/// Wraps an operation update stream such that the last update before it closes
438/// is tried to be written to the operation log entry as its outcome.
439pub fn caching_operation_update_stream<'a, U, S>(
440    db: Database,
441    operation_id: OperationId,
442    stream: S,
443) -> BoxStream<'a, U>
444where
445    U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
446    S: Stream<Item = U> + MaybeSend + 'a,
447{
448    let mut stream = Box::pin(stream);
449    Box::pin(stream! {
450        let mut last_update = None;
451        while let Some(update) = stream.next().await {
452            yield update.clone();
453            last_update = Some(update);
454        }
455
456        let Some(last_update) = last_update else {
457            error!(
458                target: LOG_CLIENT,
459                "Stream ended without any updates, this should not happen!"
460            );
461            return;
462        };
463
464        OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
465    })
466}
467
468#[cfg(test)]
469mod tests {
470    use std::time::{Duration, SystemTime};
471
472    use fedimint_core::core::OperationId;
473    use fedimint_core::db::mem_impl::MemDatabase;
474    use fedimint_core::db::{
475        Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, IRawDatabaseExt,
476    };
477    use fedimint_core::module::registry::ModuleRegistry;
478    use futures::stream::StreamExt;
479    use serde::{Deserialize, Serialize};
480
481    use super::UpdateStreamOrOutcome;
482    use crate::db::{ChronologicalOperationLogKey, OperationLogKey};
483    use crate::oplog::{JsonStringed, OperationLog, OperationLogEntry, OperationOutcome};
484
485    #[test]
486    fn test_operation_log_entry_serde() {
487        // Test with outcome = None
488        let op_log = OperationLogEntry {
489            operation_module_kind: "test".to_string(),
490            meta: JsonStringed(serde_json::to_value(()).unwrap()),
491            outcome: None,
492        };
493
494        op_log.meta::<()>();
495    }
496
497    #[test]
498    fn test_operation_log_entry_serde_extra_meta() {
499        #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
500        struct Meta {
501            foo: String,
502            extra_meta: serde_json::Value,
503        }
504
505        let meta = Meta {
506            foo: "bar".to_string(),
507            extra_meta: serde_json::to_value(()).unwrap(),
508        };
509
510        let op_log = OperationLogEntry {
511            operation_module_kind: "test".to_string(),
512            meta: JsonStringed(serde_json::to_value(meta.clone()).unwrap()),
513            outcome: Some(OperationOutcome {
514                time: fedimint_core::time::now(),
515                outcome: JsonStringed(serde_json::to_value("test_outcome").unwrap()),
516            }),
517        };
518
519        assert_eq!(op_log.meta::<Meta>(), meta);
520    }
521
522    #[tokio::test]
523    async fn test_operation_log_update() {
524        let op_id = OperationId([0x32; 32]);
525
526        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
527        let op_log = OperationLog::new(db.clone());
528
529        let mut dbtx = db.begin_transaction().await;
530        op_log
531            .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
532            .await;
533        dbtx.commit_tx().await;
534
535        let op = op_log.get_operation(op_id).await.expect("op exists");
536        assert_eq!(op.outcome, None);
537
538        OperationLog::set_operation_outcome(&db, op_id, &"baz")
539            .await
540            .unwrap();
541
542        let op = op_log.get_operation(op_id).await.expect("op exists");
543        assert_eq!(op.outcome::<String>(), Some("baz".to_string()));
544        assert!(op.outcome_time().is_some(), "outcome_time should be set");
545
546        let update_stream_or_outcome =
547            op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty);
548
549        assert!(matches!(
550            &update_stream_or_outcome,
551            UpdateStreamOrOutcome::Outcome(s) if s == "baz"
552        ));
553
554        let updates = update_stream_or_outcome
555            .into_stream()
556            .collect::<Vec<_>>()
557            .await;
558        assert_eq!(updates, vec!["baz"]);
559    }
560
561    #[tokio::test]
562    async fn test_operation_log_update_from_stream() {
563        let op_id = OperationId([0x32; 32]);
564
565        let db = MemDatabase::new().into_database();
566        let op_log = OperationLog::new(db.clone());
567
568        let mut dbtx = db.begin_transaction().await;
569        op_log
570            .add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
571            .await;
572        dbtx.commit_tx().await;
573
574        let op = op_log.get_operation(op_id).await.expect("op exists");
575
576        let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()];
577        let update_stream = op
578            .outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone()));
579
580        let received_updates = update_stream.into_stream().collect::<Vec<_>>().await;
581        assert_eq!(received_updates, updates);
582
583        let op_updated = op_log.get_operation(op_id).await.expect("op exists");
584        assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string()));
585        assert!(
586            op_updated.outcome_time().is_some(),
587            "outcome_time should be set after stream completion"
588        );
589    }
590
591    #[tokio::test]
592    async fn test_pagination() {
593        fn assert_page_entries(
594            page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>,
595            page_idx: u8,
596        ) {
597            for (entry_idx, (_key, entry)) in page.into_iter().enumerate() {
598                let actual_meta = entry.meta::<u8>();
599                let expected_meta = 97 - (page_idx * 10 + entry_idx as u8);
600
601                assert_eq!(actual_meta, expected_meta);
602            }
603        }
604
605        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
606        let op_log = OperationLog::new(db.clone());
607
608        for operation_idx in 0u8..98 {
609            let mut dbtx = db.begin_transaction().await;
610            op_log
611                .add_operation_log_entry(
612                    &mut dbtx.to_ref_nc(),
613                    OperationId([operation_idx; 32]),
614                    "foo",
615                    operation_idx,
616                )
617                .await;
618            dbtx.commit_tx().await;
619        }
620
621        let mut previous_last_element = None;
622        for page_idx in 0u8..9 {
623            let page = op_log
624                .paginate_operations_rev(10, previous_last_element)
625                .await;
626            assert_eq!(page.len(), 10);
627            previous_last_element = Some(page[9].0);
628            assert_page_entries(page, page_idx);
629        }
630
631        let page = op_log
632            .paginate_operations_rev(10, previous_last_element)
633            .await;
634        assert_eq!(page.len(), 8);
635        assert_page_entries(page, 9);
636    }
637
638    #[tokio::test]
639    async fn test_pagination_empty() {
640        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
641        let op_log = OperationLog::new(db.clone());
642
643        let page = op_log.paginate_operations_rev(10, None).await;
644        assert!(page.is_empty());
645    }
646
647    #[tokio::test]
648    async fn test_pagination_multiple_operations_same_time() {
649        async fn insert_oplog(dbtx: &mut DatabaseTransaction<'_>, idx: u8, time: u64) {
650            let operation_id = OperationId([idx; 32]);
651            // Some time in the 2010s
652            let creation_time = SystemTime::UNIX_EPOCH
653                + Duration::from_secs(60 * 60 * 24 * 365 * 40)
654                + Duration::from_secs(time * 60 * 60 * 24);
655
656            dbtx.insert_new_entry(
657                &OperationLogKey { operation_id },
658                &OperationLogEntry {
659                    operation_module_kind: "operation_type".to_string(),
660                    meta: JsonStringed(serde_json::Value::Null),
661                    outcome: None,
662                },
663            )
664            .await;
665            dbtx.insert_new_entry(
666                &ChronologicalOperationLogKey {
667                    creation_time,
668                    operation_id,
669                },
670                &(),
671            )
672            .await;
673        }
674
675        async fn assert_pages(operation_log: &OperationLog, pages: Vec<Vec<u8>>) {
676            let mut previous_last_element: Option<ChronologicalOperationLogKey> = None;
677            for reference_page in pages {
678                let page = operation_log
679                    .paginate_operations_rev(10, previous_last_element)
680                    .await;
681                assert_eq!(page.len(), reference_page.len());
682                assert_eq!(
683                    page.iter()
684                        .map(|(operation_log_key, _)| operation_log_key.operation_id)
685                        .collect::<Vec<_>>(),
686                    reference_page
687                        .iter()
688                        .map(|&x| OperationId([x; 32]))
689                        .collect::<Vec<_>>()
690                );
691                previous_last_element = page.last().map(|(key, _)| key).copied();
692            }
693        }
694
695        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
696        let op_log = OperationLog::new(db.clone());
697
698        let mut dbtx = db.begin_transaction().await;
699        for operation_idx in 0u8..10 {
700            insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 1).await;
701        }
702        dbtx.commit_tx().await;
703        assert_pages(&op_log, vec![vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0], vec![]]).await;
704
705        let mut dbtx = db.begin_transaction().await;
706        for operation_idx in 10u8..16 {
707            insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 2).await;
708        }
709        for operation_idx in 16u8..22 {
710            insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 3).await;
711        }
712        dbtx.commit_tx().await;
713        assert_pages(
714            &op_log,
715            vec![
716                vec![21, 20, 19, 18, 17, 16, 15, 14, 13, 12],
717                vec![11, 10, 9, 8, 7, 6, 5, 4, 3, 2],
718                vec![1, 0],
719                vec![],
720            ],
721        )
722        .await;
723
724        let mut dbtx = db.begin_transaction().await;
725        for operation_idx in 22u8..31 {
726            // 9 times one operation every 10 days
727            insert_oplog(
728                &mut dbtx.to_ref_nc(),
729                operation_idx,
730                10 * u64::from(operation_idx),
731            )
732            .await;
733        }
734        dbtx.commit_tx().await;
735        assert_pages(
736            &op_log,
737            vec![
738                vec![30, 29, 28, 27, 26, 25, 24, 23, 22, 21],
739                vec![20, 19, 18, 17, 16, 15, 14, 13, 12, 11],
740                vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
741                vec![0],
742                vec![],
743            ],
744        )
745        .await;
746    }
747
748    #[tokio::test]
749    async fn test_pagination_empty_then_not() {
750        let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
751        let op_log = OperationLog::new(db.clone());
752
753        let page = op_log.paginate_operations_rev(10, None).await;
754        assert!(page.is_empty());
755
756        let mut dbtx = db.begin_transaction().await;
757        op_log
758            .add_operation_log_entry(&mut dbtx.to_ref_nc(), OperationId([0; 32]), "foo", "bar")
759            .await;
760        dbtx.commit_tx().await;
761
762        let page = op_log.paginate_operations_rev(10, None).await;
763        assert_eq!(page.len(), 1);
764    }
765}