fedimint_core/db/
mod.rs

1//! Core Fedimint database traits and types
2//!
3//! This module provides the core key-value database for Fedimint.
4//!
5//! # Usage
6//!
7//! To use the database, you typically follow these steps:
8//!
9//! 1. Create a `Database` instance
10//! 2. Begin a transaction
11//! 3. Perform operations within the transaction
12//! 4. Commit the transaction
13//!
14//! ## Example
15//!
16//! ```rust
17//! use fedimint_core::db::mem_impl::MemDatabase;
18//! use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19//! use fedimint_core::encoding::{Decodable, Encodable};
20//! use fedimint_core::impl_db_record;
21//! use fedimint_core::module::registry::ModuleDecoderRegistry;
22//!
23//! #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
24//! pub struct TestKey(pub u64);
25//! #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
26//! pub struct TestVal(pub u64);
27//!
28//! #[repr(u8)]
29//! #[derive(Clone)]
30//! pub enum TestDbKeyPrefix {
31//!     Test = 0x42,
32//! }
33//!
34//! impl_db_record!(
35//!     key = TestKey,
36//!     value = TestVal,
37//!     db_prefix = TestDbKeyPrefix::Test,
38//! );
39//!
40//! # async fn example() {
41//! // Create a new in-memory database
42//! let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
43//!
44//! // Begin a transaction
45//! let mut tx = db.begin_transaction().await;
46//!
47//! // Perform operations
48//! tx.insert_entry(&TestKey(1), &TestVal(100)).await;
49//! let value = tx.get_value(&TestKey(1)).await;
50//!
51//! // Commit the transaction
52//! tx.commit_tx().await;
53//!
54//! // For operations that may need to be retried due to conflicts, use the
55//! // `autocommit` function:
56//!
57//! db.autocommit(
58//!     |dbtx, _| {
59//!         Box::pin(async move {
60//!             dbtx.insert_entry(&TestKey(1), &TestVal(100)).await;
61//!             anyhow::Ok(())
62//!         })
63//!     },
64//!     None,
65//! )
66//! .await
67//! .unwrap();
68//! # }
69//! ```
70//!
71//! # Isolation of database transactions
72//!
73//! Fedimint requires that the database implementation implement Snapshot
74//! Isolation. Snapshot Isolation is a database isolation level that guarantees
75//! consistent reads from the time that the snapshot was created (at transaction
76//! creation time). Transactions with Snapshot Isolation level will only commit
77//! if there has been no write to the modified keys since the snapshot (i.e.
78//! write-write conflicts are prevented).
79//!
80//! Specifically, Fedimint expects the database implementation to prevent the
81//! following anomalies:
82//!
83//! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
84//! at time (t + i)
85//!
86//! Dirty Read: TX1 is able to read TX2's uncommitted writes.
87//!
88//! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
89//! time (t + i) where V1 != V2.
90//!
91//! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
92//! retrieves Y number of records for the same prefix at time (t + i).
93//!
94//! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
95//! overwrites V1 as the value for K1 (write-write conflict).
96//!
97//! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
98//! Record | Lost Writes | | -------- | ------------------ | ---------- |
99//! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
100//! | Prevented  | Prevented           | Prevented      | Possible    |
101//! | RocksDB  | Prevented          | Prevented  | Prevented           |
102//! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
103//! | Prevented           | Prevented      | Prevented   |
104
105use std::any;
106use std::collections::BTreeMap;
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{bail, Context, Result};
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use futures::{Stream, StreamExt};
120use macro_rules_attribute::apply;
121use rand::Rng;
122use serde::Serialize;
123use strum_macros::EnumIter;
124use thiserror::Error;
125use tracing::{debug, error, info, instrument, trace, warn};
126
127use crate::core::ModuleInstanceId;
128use crate::encoding::{Decodable, Encodable};
129use crate::fmt_utils::AbbreviateHexBytes;
130use crate::task::{MaybeSend, MaybeSync};
131use crate::{async_trait_maybe_send, maybe_add_send, timing};
132
133pub mod mem_impl;
134pub mod notifications;
135
136pub use test_utils::*;
137
138use self::notifications::{Notifications, NotifyQueue};
139use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
140
141pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
142
143pub trait DatabaseKeyPrefix: Debug {
144    fn to_bytes(&self) -> Vec<u8>;
145}
146
147/// A key + value pair in the database with a unique prefix
148/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
149pub trait DatabaseRecord: DatabaseKeyPrefix {
150    const DB_PREFIX: u8;
151    const NOTIFY_ON_MODIFY: bool = false;
152    type Key: DatabaseKey + Debug;
153    type Value: DatabaseValue + Debug;
154}
155
156/// A key that can be used to query one or more `DatabaseRecord`
157/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
158pub trait DatabaseLookup: DatabaseKeyPrefix {
159    type Record: DatabaseRecord;
160}
161
162// Every `DatabaseRecord` is automatically a `DatabaseLookup`
163impl<Record> DatabaseLookup for Record
164where
165    Record: DatabaseRecord + Debug + Decodable + Encodable,
166{
167    type Record = Record;
168}
169
170/// `DatabaseKey` that represents the lookup structure for retrieving key/value
171/// pairs from the database.
172pub trait DatabaseKey: Sized {
173    /// Send a notification to tasks waiting to be notified if the value of
174    /// `DatabaseKey` is modified
175    ///
176    /// For instance, this can be used to be notified when a key in the
177    /// database is created. It is also possible to run a closure with the
178    /// value of the `DatabaseKey` as parameter to verify some changes to
179    /// that value.
180    const NOTIFY_ON_MODIFY: bool = false;
181    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
182}
183
184/// Marker trait for `DatabaseKey`s where `NOTIFY` is true
185pub trait DatabaseKeyWithNotify {}
186
187/// `DatabaseValue` that represents the value structure of database records.
188pub trait DatabaseValue: Sized + Debug {
189    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
190    fn to_bytes(&self) -> Vec<u8>;
191}
192
193pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
194
195/// Just ignore this type, it's only there to make compiler happy
196///
197/// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
198pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
199
200/// Error returned when the autocommit function fails
201#[derive(Debug, Error)]
202pub enum AutocommitError<E> {
203    /// Committing the transaction failed too many times, giving up
204    #[error("Commit Failed: {last_error}")]
205    CommitFailed {
206        /// Number of attempts
207        attempts: usize,
208        /// Last error on commit
209        last_error: anyhow::Error,
210    },
211    /// Error returned by the closure provided to `autocommit`. If returned no
212    /// commit was attempted in that round
213    #[error("Closure error: {error}")]
214    ClosureError {
215        /// The attempt on which the closure returned an error
216        ///
217        /// Values other than 0 typically indicate a logic error since the
218        /// closure given to `autocommit` should not have side effects
219        /// and thus keep succeeding if it succeeded once.
220        attempts: usize,
221        /// Error returned by the closure
222        error: E,
223    },
224}
225
226/// Raw database implementation
227///
228/// This and [`IRawDatabaseTransaction`] are meant to be implemented
229/// by crates like `fedimint-rocksdb` to provide a concrete implementation
230/// of a database to be used by Fedimint.
231///
232/// This is in contrast of [`IDatabase`] which includes extra
233/// functionality that Fedimint needs (and adds) on top of it.
234#[apply(async_trait_maybe_send!)]
235pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
236    /// A raw database transaction type
237    type Transaction<'a>: IRawDatabaseTransaction + Debug;
238
239    /// Start a database transaction
240    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
241
242    // Checkpoint the database to a backup directory
243    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
244}
245
246#[apply(async_trait_maybe_send!)]
247impl<T> IRawDatabase for Box<T>
248where
249    T: IRawDatabase,
250{
251    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
252
253    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
254        (**self).begin_transaction().await
255    }
256
257    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
258        (**self).checkpoint(backup_path)
259    }
260}
261
262/// An extension trait with convenience operations on [`IRawDatabase`]
263pub trait IRawDatabaseExt: IRawDatabase + Sized {
264    /// Convert to type implementing [`IRawDatabase`] into [`Database`].
265    ///
266    /// When type inference is not an issue, [`Into::into`] can be used instead.
267    fn into_database(self) -> Database {
268        Database::new(self, ModuleRegistry::default())
269    }
270}
271
272impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
273
274impl<T> From<T> for Database
275where
276    T: IRawDatabase,
277{
278    fn from(raw: T) -> Self {
279        Self::new(raw, ModuleRegistry::default())
280    }
281}
282
283/// A database that on top of a raw database operation, implements
284/// key notification system.
285#[apply(async_trait_maybe_send!)]
286pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
287    /// Start a database transaction
288    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
289    /// Register (and wait) for `key` updates
290    async fn register(&self, key: &[u8]);
291    /// Notify about `key` update (creation, modification, deletion)
292    async fn notify(&self, key: &[u8]);
293
294    /// The prefix len of this database refers to the global (as opposed to
295    /// module-isolated) key space
296    fn is_global(&self) -> bool;
297
298    /// Checkpoints the database to a backup directory
299    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
300}
301
302#[apply(async_trait_maybe_send!)]
303impl<T> IDatabase for Arc<T>
304where
305    T: IDatabase + ?Sized,
306{
307    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
308        (**self).begin_transaction().await
309    }
310    async fn register(&self, key: &[u8]) {
311        (**self).register(key).await;
312    }
313    async fn notify(&self, key: &[u8]) {
314        (**self).notify(key).await;
315    }
316
317    fn is_global(&self) -> bool {
318        (**self).is_global()
319    }
320
321    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
322        (**self).checkpoint(backup_path)
323    }
324}
325
326/// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
327///
328/// Mostly notification system, but also run-time single-commit handling.
329struct BaseDatabase<RawDatabase> {
330    notifications: Arc<Notifications>,
331    raw: RawDatabase,
332}
333
334impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        f.write_str("BaseDatabase")
337    }
338}
339
340#[apply(async_trait_maybe_send!)]
341impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
342    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
343        Box::new(BaseDatabaseTransaction::new(
344            self.raw.begin_transaction().await,
345            self.notifications.clone(),
346        ))
347    }
348    async fn register(&self, key: &[u8]) {
349        self.notifications.register(key).await;
350    }
351    async fn notify(&self, key: &[u8]) {
352        self.notifications.notify(key);
353    }
354
355    fn is_global(&self) -> bool {
356        true
357    }
358
359    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
360        self.raw.checkpoint(backup_path)
361    }
362}
363
364/// A public-facing newtype over `IDatabase`
365///
366/// Notably carries set of module decoders (`ModuleDecoderRegistry`)
367/// and implements common utility function for auto-commits, db isolation,
368/// and other.
369#[derive(Clone, Debug)]
370pub struct Database {
371    inner: Arc<dyn IDatabase + 'static>,
372    module_decoders: ModuleDecoderRegistry,
373}
374
375impl Database {
376    pub fn strong_count(&self) -> usize {
377        Arc::strong_count(&self.inner)
378    }
379
380    pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
381        self.inner
382    }
383}
384
385impl Database {
386    /// Creates a new Fedimint database from any object implementing
387    /// [`IDatabase`].
388    ///
389    /// See also [`Database::new_from_arc`].
390    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
391        let inner = BaseDatabase {
392            raw,
393            notifications: Arc::new(Notifications::new()),
394        };
395        Self::new_from_arc(
396            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
397            module_decoders,
398        )
399    }
400
401    /// Create [`Database`] from an already typed-erased `IDatabase`.
402    pub fn new_from_arc(
403        inner: Arc<dyn IDatabase + 'static>,
404        module_decoders: ModuleDecoderRegistry,
405    ) -> Self {
406        Self {
407            inner,
408            module_decoders,
409        }
410    }
411
412    /// Create [`Database`] isolated to a partition with a given `prefix`
413    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
414        Self {
415            inner: Arc::new(PrefixDatabase {
416                inner: self.inner.clone(),
417                global_dbtx_access_token: None,
418                prefix,
419            }),
420            module_decoders: self.module_decoders.clone(),
421        }
422    }
423
424    /// Create [`Database`] isolated to a partition with a prefix for a given
425    /// `module_instance_id`, allowing the module to access `global_dbtx` with
426    /// the right `access_token`
427    pub fn with_prefix_module_id(
428        &self,
429        module_instance_id: ModuleInstanceId,
430    ) -> (Self, GlobalDBTxAccessToken) {
431        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
432        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
433        (
434            Self {
435                inner: Arc::new(PrefixDatabase {
436                    inner: self.inner.clone(),
437                    global_dbtx_access_token: Some(global_dbtx_access_token),
438                    prefix,
439                }),
440                module_decoders: self.module_decoders.clone(),
441            },
442            global_dbtx_access_token,
443        )
444    }
445
446    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
447        Self {
448            inner: self.inner.clone(),
449            module_decoders,
450        }
451    }
452
453    /// Is this `Database` a global, unpartitioned `Database`
454    pub fn is_global(&self) -> bool {
455        self.inner.is_global()
456    }
457
458    /// `Err` if [`Self::is_global`] is not true
459    pub fn ensure_global(&self) -> Result<()> {
460        if !self.is_global() {
461            bail!("Database instance not global");
462        }
463
464        Ok(())
465    }
466
467    /// `Err` if [`Self::is_global`] is true
468    pub fn ensure_isolated(&self) -> Result<()> {
469        if self.is_global() {
470            bail!("Database instance not isolated");
471        }
472
473        Ok(())
474    }
475
476    /// Begin a new committable database transaction
477    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
478    where
479        's: 'tx,
480    {
481        DatabaseTransaction::<Committable>::new(
482            self.inner.begin_transaction().await,
483            self.module_decoders.clone(),
484        )
485    }
486
487    /// Begin a new non-committable database transaction
488    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
489    where
490        's: 'tx,
491    {
492        self.begin_transaction().await.into_nc()
493    }
494
495    pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
496        self.inner.checkpoint(backup_path)
497    }
498
499    /// Runs a closure with a reference to a database transaction and tries to
500    /// commit the transaction if the closure returns `Ok` and rolls it back
501    /// otherwise. If committing fails the closure is run for up to
502    /// `max_attempts` times. If `max_attempts` is `None` it will run
503    /// `usize::MAX` times which is close enough to infinite times.
504    ///
505    /// The closure `tx_fn` provided should not have side effects outside of the
506    /// database transaction provided, or if it does these should be
507    /// idempotent, since the closure might be run multiple times.
508    ///
509    /// # Lifetime Parameters
510    ///
511    /// The higher rank trait bound (HRTB) `'a` that is applied to the the
512    /// mutable reference to the database transaction ensures that the
513    /// reference lives as least as long as the returned future of the
514    /// closure.
515    ///
516    /// Further, the reference to self (`'s`) must outlive the
517    /// `DatabaseTransaction<'dt>`. In other words, the
518    /// `DatabaseTransaction` must live as least as long as `self` and that is
519    /// true as the `DatabaseTransaction` is only dropped at the end of the
520    /// `loop{}`.
521    ///
522    /// # Panics
523    ///
524    /// This function panics when the given number of maximum attempts is zero.
525    /// `max_attempts` must be greater or equal to one.
526    pub async fn autocommit<'s, 'dbtx, F, T, E>(
527        &'s self,
528        tx_fn: F,
529        max_attempts: Option<usize>,
530    ) -> Result<T, AutocommitError<E>>
531    where
532        's: 'dbtx,
533        for<'r, 'o> F: Fn(
534            &'r mut DatabaseTransaction<'o>,
535            PhantomBound<'dbtx, 'o>,
536        ) -> BoxFuture<'r, Result<T, E>>,
537    {
538        assert_ne!(max_attempts, Some(0));
539        let mut curr_attempts: usize = 0;
540
541        loop {
542            // The `checked_add()` function is used to catch the `usize` overflow.
543            // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
544            // after ~50 days. But if that's the case, something else must be wrong.
545            // With `usize=64bit` it would take much longer, obviously.
546            curr_attempts = curr_attempts
547                .checked_add(1)
548                .expect("db autocommit attempt counter overflowed");
549
550            let mut dbtx = self.begin_transaction().await;
551
552            let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
553            let val = match tx_fn_res {
554                Ok(val) => val,
555                Err(err) => {
556                    dbtx.ignore_uncommitted();
557                    return Err(AutocommitError::ClosureError {
558                        attempts: curr_attempts,
559                        error: err,
560                    });
561                }
562            };
563
564            let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
565
566            match dbtx.commit_tx_result().await {
567                Ok(()) => {
568                    return Ok(val);
569                }
570                Err(err) => {
571                    if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
572                        warn!(
573                            target: LOG_DB,
574                            curr_attempts,
575                            ?err,
576                            "Database commit failed in an autocommit block - terminating"
577                        );
578                        return Err(AutocommitError::CommitFailed {
579                            attempts: curr_attempts,
580                            last_error: err,
581                        });
582                    }
583
584                    let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
585                    let delay = rand::thread_rng().gen_range(delay..(2 * delay));
586                    warn!(
587                        target: LOG_DB,
588                        curr_attempts,
589                        %err,
590                        delay_ms = %delay,
591                        "Database commit failed in an autocommit block - retrying"
592                    );
593                    crate::runtime::sleep(Duration::from_millis(delay)).await;
594                }
595            }
596        }
597    }
598
599    /// Waits for key to be notified.
600    ///
601    /// Calls the `checker` when value of the key may have changed.
602    /// Returns the value when `checker` returns a `Some(T)`.
603    pub async fn wait_key_check<'a, K, T>(
604        &'a self,
605        key: &K,
606        checker: impl Fn(Option<K::Value>) -> Option<T>,
607    ) -> (T, DatabaseTransaction<'a, Committable>)
608    where
609        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
610    {
611        let key_bytes = key.to_bytes();
612        loop {
613            // register for notification
614            let notify = self.inner.register(&key_bytes);
615
616            // check for value in db
617            let mut tx = self.inner.begin_transaction().await;
618
619            let maybe_value_bytes = tx
620                .raw_get_bytes(&key_bytes)
621                .await
622                .expect("Unrecoverable error when reading from database")
623                .map(|value_bytes| {
624                    decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
625                });
626
627            if let Some(value) = checker(maybe_value_bytes) {
628                return (
629                    value,
630                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
631                );
632            }
633
634            // key not found, try again
635            notify.await;
636            // if miss a notification between await and next register, it is
637            // fine. because we are going check the database
638        }
639    }
640
641    /// Waits for key to be present in database.
642    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
643    where
644        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
645    {
646        self.wait_key_check(key, std::convert::identity).await.0
647    }
648}
649
650fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
651    let mut prefix = vec![MODULE_GLOBAL_PREFIX];
652    module_instance_id
653        .consensus_encode(&mut prefix)
654        .expect("Error encoding module instance id as prefix");
655    prefix
656}
657
658/// A database that wraps an `inner` one and adds a prefix to all operations,
659/// effectively creating an isolated partition.
660#[derive(Clone, Debug)]
661struct PrefixDatabase<Inner>
662where
663    Inner: Debug,
664{
665    prefix: Vec<u8>,
666    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
667    inner: Inner,
668}
669
670impl<Inner> PrefixDatabase<Inner>
671where
672    Inner: Debug,
673{
674    // TODO: we should optimize these concatenations, maybe by having an internal
675    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
676    // something
677    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
678        let mut full_key = self.prefix.clone();
679        full_key.extend_from_slice(key);
680        full_key
681    }
682}
683
684#[apply(async_trait_maybe_send!)]
685impl<Inner> IDatabase for PrefixDatabase<Inner>
686where
687    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
688{
689    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
690        Box::new(PrefixDatabaseTransaction {
691            inner: self.inner.begin_transaction().await,
692            global_dbtx_access_token: self.global_dbtx_access_token,
693            prefix: self.prefix.clone(),
694        })
695    }
696    async fn register(&self, key: &[u8]) {
697        self.inner.register(&self.get_full_key(key)).await;
698    }
699
700    async fn notify(&self, key: &[u8]) {
701        self.inner.notify(&self.get_full_key(key)).await;
702    }
703
704    fn is_global(&self) -> bool {
705        if self.global_dbtx_access_token.is_some() {
706            false
707        } else {
708            self.inner.is_global()
709        }
710    }
711
712    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
713        self.inner.checkpoint(backup_path)
714    }
715}
716
717/// A database transactions that wraps an `inner` one and adds a prefix to all
718/// operations, effectively creating an isolated partition.
719///
720/// Produced by [`PrefixDatabase`].
721#[derive(Debug)]
722struct PrefixDatabaseTransaction<Inner> {
723    inner: Inner,
724    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
725    prefix: Vec<u8>,
726}
727
728impl<Inner> PrefixDatabaseTransaction<Inner> {
729    // TODO: we should optimize these concatenations, maybe by having an internal
730    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
731    // something
732    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
733        let mut full_key = self.prefix.clone();
734        full_key.extend_from_slice(key);
735        full_key
736    }
737
738    fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
739        Range {
740            start: self.get_full_key(range.start),
741            end: self.get_full_key(range.end),
742        }
743    }
744
745    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
746        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
747    }
748}
749
750#[apply(async_trait_maybe_send!)]
751impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
752where
753    Inner: IDatabaseTransaction,
754{
755    async fn commit_tx(&mut self) -> Result<()> {
756        self.inner.commit_tx().await
757    }
758
759    fn is_global(&self) -> bool {
760        if self.global_dbtx_access_token.is_some() {
761            false
762        } else {
763            self.inner.is_global()
764        }
765    }
766
767    fn global_dbtx(
768        &mut self,
769        access_token: GlobalDBTxAccessToken,
770    ) -> &mut dyn IDatabaseTransaction {
771        if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
772            assert_eq!(
773                access_token, self_global_dbtx_access_token,
774                "Invalid access key used to access global_dbtx"
775            );
776            &mut self.inner
777        } else {
778            self.inner.global_dbtx(access_token)
779        }
780    }
781}
782
783#[apply(async_trait_maybe_send!)]
784impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
785where
786    Inner: IDatabaseTransactionOpsCore,
787{
788    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
789        let key = self.get_full_key(key);
790        self.inner.raw_insert_bytes(&key, value).await
791    }
792
793    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794        let key = self.get_full_key(key);
795        self.inner.raw_get_bytes(&key).await
796    }
797
798    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
799        let key = self.get_full_key(key);
800        self.inner.raw_remove_entry(&key).await
801    }
802
803    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
804        let key = self.get_full_key(key_prefix);
805        let stream = self.inner.raw_find_by_prefix(&key).await?;
806        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
807    }
808
809    async fn raw_find_by_prefix_sorted_descending(
810        &mut self,
811        key_prefix: &[u8],
812    ) -> Result<PrefixStream<'_>> {
813        let key = self.get_full_key(key_prefix);
814        let stream = self
815            .inner
816            .raw_find_by_prefix_sorted_descending(&key)
817            .await?;
818        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
819    }
820
821    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
822        let range = self.get_full_range(range);
823        let stream = self
824            .inner
825            .raw_find_by_range(Range {
826                start: &range.start,
827                end: &range.end,
828            })
829            .await?;
830        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
831    }
832
833    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
834        let key = self.get_full_key(key_prefix);
835        self.inner.raw_remove_by_prefix(&key).await
836    }
837}
838
839#[apply(async_trait_maybe_send!)]
840impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
841where
842    Inner: IDatabaseTransactionOps,
843{
844    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
845        self.inner.rollback_tx_to_savepoint().await
846    }
847
848    async fn set_tx_savepoint(&mut self) -> Result<()> {
849        self.set_tx_savepoint().await
850    }
851}
852
853/// Core raw a operations database transactions supports
854///
855/// Used to enforce the same signature on all types supporting it
856#[apply(async_trait_maybe_send!)]
857pub trait IDatabaseTransactionOpsCore: MaybeSend {
858    /// Insert entry
859    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
860
861    /// Get key value
862    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
863
864    /// Remove entry by `key`
865    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
866
867    /// Returns an stream of key-value pairs with keys that start with
868    /// `key_prefix`, sorted by key.
869    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
870
871    /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
872    async fn raw_find_by_prefix_sorted_descending(
873        &mut self,
874        key_prefix: &[u8],
875    ) -> Result<PrefixStream<'_>>;
876
877    /// Returns an stream of key-value pairs with keys within a `range`, sorted
878    /// by key. [`Range`] is an (half-open) range bounded inclusively below and
879    /// exclusively above.
880    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
881
882    /// Delete keys matching prefix
883    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
884}
885
886#[apply(async_trait_maybe_send!)]
887impl<T> IDatabaseTransactionOpsCore for Box<T>
888where
889    T: IDatabaseTransactionOpsCore + ?Sized,
890{
891    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
892        (**self).raw_insert_bytes(key, value).await
893    }
894
895    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
896        (**self).raw_get_bytes(key).await
897    }
898
899    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
900        (**self).raw_remove_entry(key).await
901    }
902
903    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
904        (**self).raw_find_by_prefix(key_prefix).await
905    }
906
907    async fn raw_find_by_prefix_sorted_descending(
908        &mut self,
909        key_prefix: &[u8],
910    ) -> Result<PrefixStream<'_>> {
911        (**self)
912            .raw_find_by_prefix_sorted_descending(key_prefix)
913            .await
914    }
915
916    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
917        (**self).raw_find_by_range(range).await
918    }
919
920    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
921        (**self).raw_remove_by_prefix(key_prefix).await
922    }
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for &mut T
927where
928    T: IDatabaseTransactionOpsCore + ?Sized,
929{
930    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
931        (**self).raw_insert_bytes(key, value).await
932    }
933
934    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
935        (**self).raw_get_bytes(key).await
936    }
937
938    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
939        (**self).raw_remove_entry(key).await
940    }
941
942    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
943        (**self).raw_find_by_prefix(key_prefix).await
944    }
945
946    async fn raw_find_by_prefix_sorted_descending(
947        &mut self,
948        key_prefix: &[u8],
949    ) -> Result<PrefixStream<'_>> {
950        (**self)
951            .raw_find_by_prefix_sorted_descending(key_prefix)
952            .await
953    }
954
955    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
956        (**self).raw_find_by_range(range).await
957    }
958
959    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
960        (**self).raw_remove_by_prefix(key_prefix).await
961    }
962}
963
964/// Additional operations (only some) database transactions expose, on top of
965/// [`IDatabaseTransactionOpsCore`]
966///
967/// In certain contexts exposing these operations would be a problem, so they
968/// are moved to a separate trait.
969#[apply(async_trait_maybe_send!)]
970pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
971    /// Create a savepoint during the transaction that can be rolled back to
972    /// using rollback_tx_to_savepoint. Rolling back to the savepoint will
973    /// atomically remove the writes that were applied since the savepoint
974    /// was created.
975    ///
976    /// Warning: Avoid using this in fedimint client code as not all database
977    /// transaction implementations will support setting a savepoint during
978    /// a transaction.
979    async fn set_tx_savepoint(&mut self) -> Result<()>;
980
981    async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
982}
983
984#[apply(async_trait_maybe_send!)]
985impl<T> IDatabaseTransactionOps for Box<T>
986where
987    T: IDatabaseTransactionOps + ?Sized,
988{
989    async fn set_tx_savepoint(&mut self) -> Result<()> {
990        (**self).set_tx_savepoint().await
991    }
992
993    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
994        (**self).rollback_tx_to_savepoint().await
995    }
996}
997
998#[apply(async_trait_maybe_send!)]
999impl<T> IDatabaseTransactionOps for &mut T
1000where
1001    T: IDatabaseTransactionOps + ?Sized,
1002{
1003    async fn set_tx_savepoint(&mut self) -> Result<()> {
1004        (**self).set_tx_savepoint().await
1005    }
1006
1007    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1008        (**self).rollback_tx_to_savepoint().await
1009    }
1010}
1011
1012/// Like [`IDatabaseTransactionOpsCore`], but typed
1013///
1014/// Implemented via blanket impl for everything that implements
1015/// [`IDatabaseTransactionOpsCore`] that has decoders (implements
1016/// [`WithDecoders`]).
1017#[apply(async_trait_maybe_send!)]
1018pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1019    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1020    where
1021        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1022
1023    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1024    where
1025        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1026        K::Value: MaybeSend + MaybeSync;
1027
1028    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1029    where
1030        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1031        K::Value: MaybeSend + MaybeSync;
1032
1033    async fn find_by_range<K>(
1034        &mut self,
1035        key_range: Range<K>,
1036    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1037    where
1038        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1039        K::Value: MaybeSend + MaybeSync;
1040
1041    async fn find_by_prefix<KP>(
1042        &mut self,
1043        key_prefix: &KP,
1044    ) -> Pin<
1045        Box<
1046            maybe_add_send!(
1047                dyn Stream<
1048                        Item = (
1049                            KP::Record,
1050                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1051                        ),
1052                    > + '_
1053            ),
1054        >,
1055    >
1056    where
1057        KP: DatabaseLookup + MaybeSend + MaybeSync,
1058        KP::Record: DatabaseKey;
1059
1060    async fn find_by_prefix_sorted_descending<KP>(
1061        &mut self,
1062        key_prefix: &KP,
1063    ) -> Pin<
1064        Box<
1065            maybe_add_send!(
1066                dyn Stream<
1067                        Item = (
1068                            KP::Record,
1069                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070                        ),
1071                    > + '_
1072            ),
1073        >,
1074    >
1075    where
1076        KP: DatabaseLookup + MaybeSend + MaybeSync,
1077        KP::Record: DatabaseKey;
1078
1079    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1080    where
1081        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1082
1083    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1084    where
1085        KP: DatabaseLookup + MaybeSend + MaybeSync;
1086}
1087
1088// blanket implementation of typed ops for anything that implements raw ops and
1089// has decoders
1090#[apply(async_trait_maybe_send!)]
1091impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1092where
1093    T: IDatabaseTransactionOpsCore + WithDecoders,
1094{
1095    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1096    where
1097        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1098    {
1099        let key_bytes = key.to_bytes();
1100        let raw = self
1101            .raw_get_bytes(&key_bytes)
1102            .await
1103            .expect("Unrecoverable error occurred while reading and entry from the database");
1104        raw.map(|value_bytes| {
1105            decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1106        })
1107    }
1108
1109    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1110    where
1111        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1112        K::Value: MaybeSend + MaybeSync,
1113    {
1114        let key_bytes = key.to_bytes();
1115        self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1116            .await
1117            .expect("Unrecoverable error occurred while inserting entry into the database")
1118            .map(|value_bytes| {
1119                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1120            })
1121    }
1122
1123    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1124    where
1125        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1126        K::Value: MaybeSend + MaybeSync,
1127    {
1128        if let Some(prev) = self.insert_entry(key, value).await {
1129            debug_assert!(
1130                false,
1131                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1132            );
1133            warn!(
1134                target: LOG_DB,
1135                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1136            );
1137        }
1138    }
1139
1140    async fn find_by_range<K>(
1141        &mut self,
1142        key_range: Range<K>,
1143    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1144    where
1145        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1146        K::Value: MaybeSend + MaybeSync,
1147    {
1148        let decoders = self.decoders().clone();
1149        Box::pin(
1150            self.raw_find_by_range(Range {
1151                start: &key_range.start.to_bytes(),
1152                end: &key_range.end.to_bytes(),
1153            })
1154            .await
1155            .expect("Unrecoverable error occurred while listing entries from the database")
1156            .map(move |(key_bytes, value_bytes)| {
1157                let key = decode_key_expect(&key_bytes, &decoders);
1158                let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1159                (key, value)
1160            }),
1161        )
1162    }
1163
1164    async fn find_by_prefix<KP>(
1165        &mut self,
1166        key_prefix: &KP,
1167    ) -> Pin<
1168        Box<
1169            maybe_add_send!(
1170                dyn Stream<
1171                        Item = (
1172                            KP::Record,
1173                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1174                        ),
1175                    > + '_
1176            ),
1177        >,
1178    >
1179    where
1180        KP: DatabaseLookup + MaybeSend + MaybeSync,
1181        KP::Record: DatabaseKey,
1182    {
1183        let decoders = self.decoders().clone();
1184        Box::pin(
1185            self.raw_find_by_prefix(&key_prefix.to_bytes())
1186                .await
1187                .expect("Unrecoverable error occurred while listing entries from the database")
1188                .map(move |(key_bytes, value_bytes)| {
1189                    let key = decode_key_expect(&key_bytes, &decoders);
1190                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1191                    (key, value)
1192                }),
1193        )
1194    }
1195
1196    async fn find_by_prefix_sorted_descending<KP>(
1197        &mut self,
1198        key_prefix: &KP,
1199    ) -> Pin<
1200        Box<
1201            maybe_add_send!(
1202                dyn Stream<
1203                        Item = (
1204                            KP::Record,
1205                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1206                        ),
1207                    > + '_
1208            ),
1209        >,
1210    >
1211    where
1212        KP: DatabaseLookup + MaybeSend + MaybeSync,
1213        KP::Record: DatabaseKey,
1214    {
1215        let decoders = self.decoders().clone();
1216        Box::pin(
1217            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1218                .await
1219                .expect("Unrecoverable error occurred while listing entries from the database")
1220                .map(move |(key_bytes, value_bytes)| {
1221                    let key = decode_key_expect(&key_bytes, &decoders);
1222                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1223                    (key, value)
1224                }),
1225        )
1226    }
1227    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1228    where
1229        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1230    {
1231        let key_bytes = key.to_bytes();
1232        self.raw_remove_entry(&key_bytes)
1233            .await
1234            .expect("Unrecoverable error occurred while inserting removing entry from the database")
1235            .map(|value_bytes| {
1236                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1237            })
1238    }
1239    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1240    where
1241        KP: DatabaseLookup + MaybeSend + MaybeSync,
1242    {
1243        self.raw_remove_by_prefix(&key_prefix.to_bytes())
1244            .await
1245            .expect("Unrecoverable error when removing entries from the database");
1246    }
1247}
1248
1249/// A database type that has decoders, which allows it to implement
1250/// [`IDatabaseTransactionOpsCoreTyped`]
1251pub trait WithDecoders {
1252    fn decoders(&self) -> &ModuleDecoderRegistry;
1253}
1254
1255/// Raw database transaction (e.g. rocksdb implementation)
1256#[apply(async_trait_maybe_send!)]
1257pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1258    async fn commit_tx(self) -> Result<()>;
1259}
1260
1261/// Fedimint database transaction
1262///
1263/// See [`IDatabase`] for more info.
1264#[apply(async_trait_maybe_send!)]
1265pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1266    /// Commit the transaction
1267    async fn commit_tx(&mut self) -> Result<()>;
1268
1269    /// Is global database
1270    fn is_global(&self) -> bool;
1271
1272    /// Get the global database tx from a module-prefixed database transaction
1273    ///
1274    /// Meant to be called only by core internals, and module developers should
1275    /// not call it directly.
1276    #[doc(hidden)]
1277    fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1278        -> &mut dyn IDatabaseTransaction;
1279}
1280
1281#[apply(async_trait_maybe_send!)]
1282impl<T> IDatabaseTransaction for Box<T>
1283where
1284    T: IDatabaseTransaction + ?Sized,
1285{
1286    async fn commit_tx(&mut self) -> Result<()> {
1287        (**self).commit_tx().await
1288    }
1289
1290    fn is_global(&self) -> bool {
1291        (**self).is_global()
1292    }
1293
1294    fn global_dbtx(
1295        &mut self,
1296        access_token: GlobalDBTxAccessToken,
1297    ) -> &mut dyn IDatabaseTransaction {
1298        (**self).global_dbtx(access_token)
1299    }
1300}
1301
1302#[apply(async_trait_maybe_send!)]
1303impl<'a, T> IDatabaseTransaction for &'a mut T
1304where
1305    T: IDatabaseTransaction + ?Sized,
1306{
1307    async fn commit_tx(&mut self) -> Result<()> {
1308        (**self).commit_tx().await
1309    }
1310
1311    fn is_global(&self) -> bool {
1312        (**self).is_global()
1313    }
1314
1315    fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1316        (**self).global_dbtx(access_key)
1317    }
1318}
1319
1320/// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1321/// easier in other structs since it does not consumed `self` by move.
1322struct BaseDatabaseTransaction<Tx> {
1323    // TODO: merge options
1324    raw: Option<Tx>,
1325    notify_queue: Option<NotifyQueue>,
1326    notifications: Arc<Notifications>,
1327}
1328
1329impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1330where
1331    Tx: fmt::Debug,
1332{
1333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1334        f.write_fmt(format_args!(
1335            "BaseDatabaseTransaction{{ raw={:?} }}",
1336            self.raw
1337        ))
1338    }
1339}
1340impl<Tx> BaseDatabaseTransaction<Tx>
1341where
1342    Tx: IRawDatabaseTransaction,
1343{
1344    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1345        Self {
1346            raw: Some(dbtx),
1347            notifications,
1348            notify_queue: Some(NotifyQueue::new()),
1349        }
1350    }
1351
1352    fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1353        self.notify_queue
1354            .as_mut()
1355            .context("can not call add_notification_key after commit")?
1356            .add(&key);
1357        Ok(())
1358    }
1359}
1360
1361#[apply(async_trait_maybe_send!)]
1362impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1363    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1364        self.add_notification_key(key)?;
1365        self.raw
1366            .as_mut()
1367            .context("Cannot insert into already consumed transaction")?
1368            .raw_insert_bytes(key, value)
1369            .await
1370    }
1371
1372    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1373        self.raw
1374            .as_mut()
1375            .context("Cannot retrieve from already consumed transaction")?
1376            .raw_get_bytes(key)
1377            .await
1378    }
1379
1380    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1381        self.add_notification_key(key)?;
1382        self.raw
1383            .as_mut()
1384            .context("Cannot remove from already consumed transaction")?
1385            .raw_remove_entry(key)
1386            .await
1387    }
1388
1389    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1390        self.raw
1391            .as_mut()
1392            .context("Cannot retrieve from already consumed transaction")?
1393            .raw_find_by_range(key_range)
1394            .await
1395    }
1396
1397    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1398        self.raw
1399            .as_mut()
1400            .context("Cannot retrieve from already consumed transaction")?
1401            .raw_find_by_prefix(key_prefix)
1402            .await
1403    }
1404
1405    async fn raw_find_by_prefix_sorted_descending(
1406        &mut self,
1407        key_prefix: &[u8],
1408    ) -> Result<PrefixStream<'_>> {
1409        self.raw
1410            .as_mut()
1411            .context("Cannot retrieve from already consumed transaction")?
1412            .raw_find_by_prefix_sorted_descending(key_prefix)
1413            .await
1414    }
1415
1416    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1417        self.raw
1418            .as_mut()
1419            .context("Cannot remove from already consumed transaction")?
1420            .raw_remove_by_prefix(key_prefix)
1421            .await
1422    }
1423}
1424
1425#[apply(async_trait_maybe_send!)]
1426impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1427    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1428        self.raw
1429            .as_mut()
1430            .context("Cannot rollback to a savepoint on an already consumed transaction")?
1431            .rollback_tx_to_savepoint()
1432            .await?;
1433        Ok(())
1434    }
1435
1436    async fn set_tx_savepoint(&mut self) -> Result<()> {
1437        self.raw
1438            .as_mut()
1439            .context("Cannot set a tx savepoint on an already consumed transaction")?
1440            .set_tx_savepoint()
1441            .await?;
1442        Ok(())
1443    }
1444}
1445
1446#[apply(async_trait_maybe_send!)]
1447impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1448    for BaseDatabaseTransaction<Tx>
1449{
1450    async fn commit_tx(&mut self) -> Result<()> {
1451        self.raw
1452            .take()
1453            .context("Cannot commit an already committed transaction")?
1454            .commit_tx()
1455            .await?;
1456        self.notifications.submit_queue(
1457            &self
1458                .notify_queue
1459                .take()
1460                .expect("commit must be called only once"),
1461        );
1462        Ok(())
1463    }
1464
1465    fn is_global(&self) -> bool {
1466        true
1467    }
1468
1469    fn global_dbtx(
1470        &mut self,
1471        _access_token: GlobalDBTxAccessToken,
1472    ) -> &mut dyn IDatabaseTransaction {
1473        panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1474    }
1475}
1476
1477/// A helper for tracking and logging on `Drop` any instances of uncommitted
1478/// writes
1479#[derive(Clone)]
1480struct CommitTracker {
1481    /// Is the dbtx committed
1482    is_committed: bool,
1483    /// Does the dbtx have any writes
1484    has_writes: bool,
1485    /// Don't warn-log uncommitted writes
1486    ignore_uncommitted: bool,
1487}
1488
1489impl Drop for CommitTracker {
1490    fn drop(&mut self) {
1491        if self.has_writes && !self.is_committed {
1492            if self.ignore_uncommitted {
1493                trace!(
1494                    target: LOG_DB,
1495                    "DatabaseTransaction has writes and has not called commit, but that's expected."
1496                );
1497            } else {
1498                warn!(
1499                    target: LOG_DB,
1500                    location = ?backtrace::Backtrace::new(),
1501                    "DatabaseTransaction has writes and has not called commit."
1502                );
1503            }
1504        }
1505    }
1506}
1507
1508enum MaybeRef<'a, T> {
1509    Owned(T),
1510    Borrowed(&'a mut T),
1511}
1512
1513impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1514    type Target = T;
1515
1516    fn deref(&self) -> &Self::Target {
1517        match self {
1518            MaybeRef::Owned(o) => o,
1519            MaybeRef::Borrowed(r) => r,
1520        }
1521    }
1522}
1523
1524impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1525    fn deref_mut(&mut self) -> &mut Self::Target {
1526        match self {
1527            MaybeRef::Owned(o) => o,
1528            MaybeRef::Borrowed(r) => r,
1529        }
1530    }
1531}
1532
1533/// Session type for [`DatabaseTransaction`] that is allowed to commit
1534///
1535/// Opposite of [`NonCommittable`].
1536pub struct Committable;
1537
1538/// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1539///
1540/// Opposite of [`Committable`].
1541
1542pub struct NonCommittable;
1543/// A high level database transaction handle
1544///
1545/// `Cap` is a session type
1546pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1547    tx: Box<dyn IDatabaseTransaction + 'tx>,
1548    decoders: ModuleDecoderRegistry,
1549    commit_tracker: MaybeRef<'tx, CommitTracker>,
1550    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1551    capability: marker::PhantomData<Cap>,
1552}
1553
1554impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1555    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1556        f.write_fmt(format_args!(
1557            "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1558            self.tx, self.decoders
1559        ))
1560    }
1561}
1562
1563impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1564    fn decoders(&self) -> &ModuleDecoderRegistry {
1565        &self.decoders
1566    }
1567}
1568
1569#[instrument(level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1570fn decode_value<V: DatabaseValue>(
1571    value_bytes: &[u8],
1572    decoders: &ModuleDecoderRegistry,
1573) -> Result<V, DecodingError> {
1574    trace!(
1575        bytes = %AbbreviateHexBytes(value_bytes),
1576        "decoding value",
1577    );
1578    V::from_bytes(value_bytes, decoders)
1579}
1580
1581fn decode_value_expect<V: DatabaseValue>(
1582    value_bytes: &[u8],
1583    decoders: &ModuleDecoderRegistry,
1584    key_bytes: &[u8],
1585) -> V {
1586    decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1587        panic!(
1588            "Unrecoverable decoding DatabaseValue as {}; err={}, bytes={}; key_bytes={}",
1589            any::type_name::<V>(),
1590            err,
1591            AbbreviateHexBytes(value_bytes),
1592            AbbreviateHexBytes(key_bytes),
1593        )
1594    })
1595}
1596
1597fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1598    trace!(
1599        bytes = %AbbreviateHexBytes(key_bytes),
1600        "decoding key",
1601    );
1602    K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1603        panic!(
1604            "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1605            any::type_name::<K>(),
1606            err,
1607            AbbreviateHexBytes(key_bytes)
1608        )
1609    })
1610}
1611
1612impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1613    /// Convert into a non-committable version
1614    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1615        DatabaseTransaction {
1616            tx: self.tx,
1617            decoders: self.decoders,
1618            commit_tracker: self.commit_tracker,
1619            on_commit_hooks: self.on_commit_hooks,
1620            capability: PhantomData::<NonCommittable>,
1621        }
1622    }
1623
1624    /// Get a reference to a non-committeable version
1625    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1626    where
1627        's: 'a,
1628    {
1629        self.to_ref().into_nc()
1630    }
1631
1632    /// Get [`DatabaseTransaction`] isolated to a `prefix`
1633    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1634    where
1635        'tx: 'a,
1636    {
1637        DatabaseTransaction {
1638            tx: Box::new(PrefixDatabaseTransaction {
1639                inner: self.tx,
1640                global_dbtx_access_token: None,
1641                prefix,
1642            }),
1643            decoders: self.decoders,
1644            commit_tracker: self.commit_tracker,
1645            on_commit_hooks: self.on_commit_hooks,
1646            capability: self.capability,
1647        }
1648    }
1649
1650    /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1651    /// `module_instance_id`, allowing the module to access global_dbtx
1652    /// with the right access token.
1653    pub fn with_prefix_module_id<'a: 'tx>(
1654        self,
1655        module_instance_id: ModuleInstanceId,
1656    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1657    where
1658        'tx: 'a,
1659    {
1660        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1661        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1662        (
1663            DatabaseTransaction {
1664                tx: Box::new(PrefixDatabaseTransaction {
1665                    inner: self.tx,
1666                    global_dbtx_access_token: Some(global_dbtx_access_token),
1667                    prefix,
1668                }),
1669                decoders: self.decoders,
1670                commit_tracker: self.commit_tracker,
1671                on_commit_hooks: self.on_commit_hooks,
1672                capability: self.capability,
1673            },
1674            global_dbtx_access_token,
1675        )
1676    }
1677
1678    /// Get [`DatabaseTransaction`] to `self`
1679    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1680    where
1681        's: 'a,
1682    {
1683        let decoders = self.decoders.clone();
1684
1685        DatabaseTransaction {
1686            tx: Box::new(&mut self.tx),
1687            decoders,
1688            commit_tracker: match self.commit_tracker {
1689                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1690                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1691            },
1692            on_commit_hooks: match self.on_commit_hooks {
1693                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1694                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1695            },
1696            capability: self.capability,
1697        }
1698    }
1699
1700    /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1701    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1702    where
1703        'tx: 'a,
1704    {
1705        DatabaseTransaction {
1706            tx: Box::new(PrefixDatabaseTransaction {
1707                inner: &mut self.tx,
1708                global_dbtx_access_token: None,
1709                prefix,
1710            }),
1711            decoders: self.decoders.clone(),
1712            commit_tracker: match self.commit_tracker {
1713                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1714                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1715            },
1716            on_commit_hooks: match self.on_commit_hooks {
1717                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1718                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1719            },
1720            capability: self.capability,
1721        }
1722    }
1723
1724    pub fn to_ref_with_prefix_module_id<'a>(
1725        &'a mut self,
1726        module_instance_id: ModuleInstanceId,
1727    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1728    where
1729        'tx: 'a,
1730    {
1731        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1732        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1733        (
1734            DatabaseTransaction {
1735                tx: Box::new(PrefixDatabaseTransaction {
1736                    inner: &mut self.tx,
1737                    global_dbtx_access_token: Some(global_dbtx_access_token),
1738                    prefix,
1739                }),
1740                decoders: self.decoders.clone(),
1741                commit_tracker: match self.commit_tracker {
1742                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1743                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1744                },
1745                on_commit_hooks: match self.on_commit_hooks {
1746                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1747                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1748                },
1749                capability: self.capability,
1750            },
1751            global_dbtx_access_token,
1752        )
1753    }
1754
1755    /// Is this `Database` a global, unpartitioned `Database`
1756    pub fn is_global(&self) -> bool {
1757        self.tx.is_global()
1758    }
1759
1760    /// `Err` if [`Self::is_global`] is not true
1761    pub fn ensure_global(&self) -> Result<()> {
1762        if !self.is_global() {
1763            bail!("Database instance not global");
1764        }
1765
1766        Ok(())
1767    }
1768
1769    /// `Err` if [`Self::is_global`] is true
1770    pub fn ensure_isolated(&self) -> Result<()> {
1771        if self.is_global() {
1772            bail!("Database instance not isolated");
1773        }
1774
1775        Ok(())
1776    }
1777
1778    /// Cancel the tx to avoid debugging warnings about uncommitted writes
1779    pub fn ignore_uncommitted(&mut self) -> &mut Self {
1780        self.commit_tracker.ignore_uncommitted = true;
1781        self
1782    }
1783
1784    /// Create warnings about uncommitted writes
1785    pub fn warn_uncommitted(&mut self) -> &mut Self {
1786        self.commit_tracker.ignore_uncommitted = false;
1787        self
1788    }
1789
1790    /// Register a hook that will be run after commit succeeds.
1791    #[instrument(level = "trace", skip_all)]
1792    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1793        self.on_commit_hooks.push(Box::new(f));
1794    }
1795
1796    pub fn global_dbtx<'a>(
1797        &'a mut self,
1798        access_token: GlobalDBTxAccessToken,
1799    ) -> DatabaseTransaction<'a, Cap>
1800    where
1801        'tx: 'a,
1802    {
1803        let decoders = self.decoders.clone();
1804
1805        DatabaseTransaction {
1806            tx: Box::new(self.tx.global_dbtx(access_token)),
1807            decoders,
1808            commit_tracker: match self.commit_tracker {
1809                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1810                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1811            },
1812            on_commit_hooks: match self.on_commit_hooks {
1813                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1814                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1815            },
1816            capability: self.capability,
1817        }
1818    }
1819}
1820
1821/// Code used to access `global_dbtx`
1822#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1823pub struct GlobalDBTxAccessToken(u32);
1824
1825impl GlobalDBTxAccessToken {
1826    /// Calculate an access code for accessing global_dbtx from a prefixed
1827    /// database tx
1828    ///
1829    /// Since we need to do it at runtime, we want the user modules not to be
1830    /// able to call `global_dbtx` too easily. But at the same time we don't
1831    /// need to be paranoid.
1832    ///
1833    /// This must be deterministic during whole instance of the software running
1834    /// (because it's being rederived independently in multiple codepahs) , but
1835    /// it could be somewhat randomized between different runs and releases.
1836    fn from_prefix(prefix: &[u8]) -> Self {
1837        Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1838    }
1839}
1840
1841impl<'tx> DatabaseTransaction<'tx, Committable> {
1842    pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1843        Self {
1844            tx: dbtx,
1845            decoders,
1846            commit_tracker: MaybeRef::Owned(CommitTracker {
1847                is_committed: false,
1848                has_writes: false,
1849                ignore_uncommitted: false,
1850            }),
1851            on_commit_hooks: MaybeRef::Owned(vec![]),
1852            capability: PhantomData,
1853        }
1854    }
1855
1856    pub async fn commit_tx_result(mut self) -> Result<()> {
1857        self.commit_tracker.is_committed = true;
1858        let commit_result = self.tx.commit_tx().await;
1859
1860        // Run commit hooks in case commit was successful
1861        if commit_result.is_ok() {
1862            for hook in self.on_commit_hooks.deref_mut().drain(..) {
1863                hook();
1864            }
1865        }
1866
1867        commit_result
1868    }
1869
1870    pub async fn commit_tx(mut self) {
1871        self.commit_tracker.is_committed = true;
1872        self.commit_tx_result()
1873            .await
1874            .expect("Unrecoverable error occurred while committing to the database.");
1875    }
1876}
1877
1878#[apply(async_trait_maybe_send!)]
1879impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1880where
1881    Cap: Send,
1882{
1883    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1884        self.commit_tracker.has_writes = true;
1885        self.tx.raw_insert_bytes(key, value).await
1886    }
1887
1888    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1889        self.tx.raw_get_bytes(key).await
1890    }
1891
1892    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1893        self.tx.raw_remove_entry(key).await
1894    }
1895
1896    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1897        self.tx.raw_find_by_range(key_range).await
1898    }
1899
1900    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1901        self.tx.raw_find_by_prefix(key_prefix).await
1902    }
1903
1904    async fn raw_find_by_prefix_sorted_descending(
1905        &mut self,
1906        key_prefix: &[u8],
1907    ) -> Result<PrefixStream<'_>> {
1908        self.tx
1909            .raw_find_by_prefix_sorted_descending(key_prefix)
1910            .await
1911    }
1912
1913    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1914        self.commit_tracker.has_writes = true;
1915        self.tx.raw_remove_by_prefix(key_prefix).await
1916    }
1917}
1918#[apply(async_trait_maybe_send!)]
1919impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1920    async fn set_tx_savepoint(&mut self) -> Result<()> {
1921        self.tx.set_tx_savepoint().await
1922    }
1923
1924    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1925        self.tx.rollback_tx_to_savepoint().await
1926    }
1927}
1928
1929impl<T> DatabaseKeyPrefix for T
1930where
1931    T: DatabaseLookup + crate::encoding::Encodable + Debug,
1932{
1933    fn to_bytes(&self) -> Vec<u8> {
1934        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1935        self.consensus_encode(&mut data)
1936            .expect("Writing to vec is infallible");
1937        data
1938    }
1939}
1940
1941impl<T> DatabaseKey for T
1942where
1943    // Note: key can only be `T` that can be decoded without modules (even if
1944    // module type is `()`)
1945    T: DatabaseRecord + crate::encoding::Decodable + Sized,
1946{
1947    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1948    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1949        if data.is_empty() {
1950            // TODO: build better coding errors, pretty useless right now
1951            return Err(DecodingError::wrong_length(1, 0));
1952        }
1953
1954        if data[0] != Self::DB_PREFIX {
1955            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1956        }
1957
1958        <Self as crate::encoding::Decodable>::consensus_decode(
1959            &mut std::io::Cursor::new(&data[1..]),
1960            modules,
1961        )
1962        .map_err(|decode_error| DecodingError::Other(decode_error.0))
1963    }
1964}
1965
1966impl<T> DatabaseValue for T
1967where
1968    T: Debug + Encodable + Decodable,
1969{
1970    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1971        T::consensus_decode(&mut std::io::Cursor::new(data), modules)
1972            .map_err(|e| DecodingError::Other(e.0))
1973    }
1974
1975    fn to_bytes(&self) -> Vec<u8> {
1976        let mut bytes = Vec::new();
1977        self.consensus_encode(&mut bytes)
1978            .expect("writing to vec can't fail");
1979        bytes
1980    }
1981}
1982
1983/// This is a helper macro that generates the implementations of
1984/// `DatabaseRecord` necessary for reading/writing to the
1985/// database and fetching by prefix.
1986///
1987/// - `key`: This is the type of struct that will be used as the key into the
1988///   database
1989/// - `value`: This is the type of struct that will be used as the value into
1990///   the database
1991/// - `db_prefix`: Required enum expression that is represented as a `u8` and is
1992///   prepended to this key
1993/// - `query_prefix`: Optional type of struct that can be passed zero or more
1994///   times. Every query prefix can be used to query the database via
1995///   `find_by_prefix`
1996///
1997/// # Examples
1998///
1999/// ```
2000/// use fedimint_core::encoding::{Decodable, Encodable};
2001/// use fedimint_core::impl_db_record;
2002///
2003/// #[derive(Debug, Encodable, Decodable)]
2004/// struct MyKey;
2005///
2006/// #[derive(Debug, Encodable, Decodable)]
2007/// struct MyValue;
2008///
2009/// #[repr(u8)]
2010/// #[derive(Clone, Debug)]
2011/// pub enum DbKeyPrefix {
2012///     MyKey = 0x50,
2013/// }
2014///
2015/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
2016/// ```
2017///
2018/// Use the required parameters and specify one `query_prefix`
2019///
2020/// ```
2021/// use fedimint_core::encoding::{Decodable, Encodable};
2022/// use fedimint_core::{impl_db_lookup, impl_db_record};
2023///
2024/// #[derive(Debug, Encodable, Decodable)]
2025/// struct MyKey;
2026///
2027/// #[derive(Debug, Encodable, Decodable)]
2028/// struct MyValue;
2029///
2030/// #[repr(u8)]
2031/// #[derive(Clone, Debug)]
2032/// pub enum DbKeyPrefix {
2033///     MyKey = 0x50,
2034/// }
2035///
2036/// #[derive(Debug, Encodable, Decodable)]
2037/// struct MyKeyPrefix;
2038///
2039/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
2040///
2041/// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
2042/// ```
2043#[macro_export]
2044macro_rules! impl_db_record {
2045    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => {
2046        impl $crate::db::DatabaseRecord for $key {
2047            const DB_PREFIX: u8 = $db_prefix as u8;
2048            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2049            type Key = Self;
2050            type Value = $val;
2051        }
2052        $(
2053            impl_db_record! {
2054                @impl_notify_marker key = $key, notify_on_modify = $notify
2055            }
2056        )?
2057    };
2058    // if notify is set to true
2059    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2060        impl $crate::db::DatabaseKeyWithNotify for $key {}
2061    };
2062    // if notify is set to false
2063    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2064}
2065
2066#[macro_export]
2067macro_rules! impl_db_lookup{
2068    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2069        $(
2070            impl $crate::db::DatabaseLookup for $query_prefix {
2071                type Record = $key;
2072            }
2073        )*
2074    };
2075}
2076
2077/// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
2078#[derive(Debug, Encodable, Decodable, Serialize)]
2079pub struct DatabaseVersionKeyV0;
2080
2081#[derive(Debug, Encodable, Decodable, Serialize)]
2082pub struct DatabaseVersionKey(pub ModuleInstanceId);
2083
2084#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2085pub struct DatabaseVersion(pub u64);
2086
2087impl_db_record!(
2088    key = DatabaseVersionKeyV0,
2089    value = DatabaseVersion,
2090    db_prefix = DbKeyPrefix::DatabaseVersion
2091);
2092
2093impl_db_record!(
2094    key = DatabaseVersionKey,
2095    value = DatabaseVersion,
2096    db_prefix = DbKeyPrefix::DatabaseVersion
2097);
2098
2099impl std::fmt::Display for DatabaseVersion {
2100    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2101        write!(f, "{}", self.0)
2102    }
2103}
2104
2105impl DatabaseVersion {
2106    pub fn increment(&self) -> Self {
2107        Self(self.0 + 1)
2108    }
2109}
2110
2111impl std::fmt::Display for DbKeyPrefix {
2112    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2113        write!(f, "{self:?}")
2114    }
2115}
2116
2117#[repr(u8)]
2118#[derive(Clone, EnumIter, Debug)]
2119pub enum DbKeyPrefix {
2120    DatabaseVersion = 0x50,
2121    ClientBackup = 0x51,
2122}
2123
2124#[derive(Debug, Error)]
2125pub enum DecodingError {
2126    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2127    WrongPrefix { expected: u8, found: u8 },
2128    #[error("Key had a wrong length, expected {expected} but got {found}")]
2129    WrongLength { expected: usize, found: usize },
2130    #[error("Other decoding error: {0}")]
2131    Other(anyhow::Error),
2132}
2133
2134impl DecodingError {
2135    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2136        Self::Other(anyhow::Error::from(error))
2137    }
2138
2139    pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2140        Self::WrongPrefix { expected, found }
2141    }
2142
2143    pub fn wrong_length(expected: usize, found: usize) -> Self {
2144        Self::WrongLength { expected, found }
2145    }
2146}
2147
2148#[macro_export]
2149macro_rules! push_db_pair_items {
2150    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2151        let db_items =
2152            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2153                .await
2154                .map(|(key, val)| {
2155                    (
2156                        $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2157                        val,
2158                    )
2159                })
2160                .collect::<BTreeMap<String, $value_type>>()
2161                .await;
2162
2163        $map.insert($key_literal.to_string(), Box::new(db_items));
2164    };
2165}
2166
2167#[macro_export]
2168macro_rules! push_db_key_items {
2169    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => {
2170        let db_items =
2171            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2172                .await
2173                .map(|(key, _)| key)
2174                .collect::<Vec<$key_type>>()
2175                .await;
2176
2177        $map.insert($key_literal.to_string(), Box::new(db_items));
2178    };
2179}
2180
2181/// `CoreMigrationFn` that modules can implement to "migrate" the database
2182/// to the next database version.
2183pub type CoreMigrationFn = for<'tx> fn(
2184    MigrationContext<'tx>,
2185) -> Pin<
2186    Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2187>;
2188
2189/// Verifies that all database migrations are defined contiguously and returns
2190/// the "current" database version, which is one greater than the last key in
2191/// the map.
2192pub fn get_current_database_version<F>(
2193    migrations: &BTreeMap<DatabaseVersion, F>,
2194) -> DatabaseVersion {
2195    let versions = migrations.keys().copied().collect::<Vec<_>>();
2196
2197    // Verify that all database migrations are defined contiguously. If there is a
2198    // gap, this indicates a programming error and we should panic.
2199    if !versions
2200        .windows(2)
2201        .all(|window| window[0].increment() == window[1])
2202    {
2203        panic!("Database Migrations are not defined contiguously");
2204    }
2205
2206    versions
2207        .last()
2208        .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2209}
2210
2211/// Applies the database migrations to a non-isolated database.
2212pub async fn apply_migrations_server(
2213    db: &Database,
2214    kind: String,
2215    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2216) -> Result<(), anyhow::Error> {
2217    apply_migrations(db, kind, migrations, None, None).await
2218}
2219
2220/// `apply_migrations` iterates from the on disk database version for the
2221/// module.
2222///
2223/// `apply_migrations` iterates from the on disk database version for the module
2224/// up to `target_db_version` and executes all of the migrations that exist in
2225/// the migrations map. Each migration in migrations map updates the
2226/// database to have the correct on-disk structures that the code is expecting.
2227/// The entire migration process is atomic (i.e migration from 0->1 and 1->2
2228/// happen atomically). This function is called before the module is initialized
2229/// and as long as the correct migrations are supplied in the migrations map,
2230/// the module will be able to read and write from the database successfully.
2231pub async fn apply_migrations(
2232    db: &Database,
2233    kind: String,
2234    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2235    module_instance_id: Option<ModuleInstanceId>,
2236    // When used in client side context, we can/should ignore keys that external app
2237    // is allowed to use, and but since this function is shared, we make it optional argument
2238    external_prefixes_above: Option<u8>,
2239) -> Result<(), anyhow::Error> {
2240    // Newly created databases will not have any data since they have just been
2241    // instantiated.
2242    let mut dbtx = db.begin_transaction_nc().await;
2243    let is_new_db = dbtx
2244        .raw_find_by_prefix(&[])
2245        .await?
2246        .filter(|(key, _v)| {
2247            std::future::ready(
2248                external_prefixes_above.map_or(true, |external_prefixes_above| {
2249                    !key.is_empty() && key[0] < external_prefixes_above
2250                }),
2251            )
2252        })
2253        .next()
2254        .await
2255        .is_none();
2256
2257    let target_db_version = get_current_database_version(&migrations);
2258
2259    // First write the database version to disk if it does not exist.
2260    create_database_version(
2261        db,
2262        target_db_version,
2263        module_instance_id,
2264        kind.clone(),
2265        is_new_db,
2266    )
2267    .await?;
2268
2269    let mut global_dbtx = db.begin_transaction().await;
2270    let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2271
2272    let disk_version = global_dbtx
2273        .get_value(&DatabaseVersionKey(module_instance_id_key))
2274        .await;
2275
2276    let db_version = if let Some(disk_version) = disk_version {
2277        let mut current_db_version = disk_version;
2278
2279        if current_db_version > target_db_version {
2280            return Err(anyhow::anyhow!(format!(
2281                "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2282            )));
2283        }
2284
2285        while current_db_version < target_db_version {
2286            if let Some(migration) = migrations.get(&current_db_version) {
2287                info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2288                migration(MigrationContext {
2289                    dbtx: global_dbtx.to_ref_nc(),
2290                    module_instance_id,
2291                })
2292                .await?;
2293            } else {
2294                warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2295            }
2296
2297            current_db_version = current_db_version.increment();
2298            global_dbtx
2299                .insert_entry(
2300                    &DatabaseVersionKey(module_instance_id_key),
2301                    &current_db_version,
2302                )
2303                .await;
2304        }
2305
2306        current_db_version
2307    } else {
2308        target_db_version
2309    };
2310
2311    global_dbtx.commit_tx_result().await?;
2312    debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2313    Ok(())
2314}
2315
2316/// Creates the `DatabaseVersion` inside the database if it does not exist. If
2317/// necessary, this function will migrate the legacy database version to the
2318/// expected `DatabaseVersionKey`.
2319pub async fn create_database_version(
2320    db: &Database,
2321    target_db_version: DatabaseVersion,
2322    module_instance_id: Option<ModuleInstanceId>,
2323    kind: String,
2324    is_new_db: bool,
2325) -> Result<(), anyhow::Error> {
2326    let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2327
2328    // First check if the module has a `DatabaseVersion` written to
2329    // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
2330    // nothing to do.
2331    let mut global_dbtx = db.begin_transaction().await;
2332    if global_dbtx
2333        .get_value(&DatabaseVersionKey(key_module_instance_id))
2334        .await
2335        .is_none()
2336    {
2337        // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
2338        // in the module's isolated namespace (but not for fedimint-server or
2339        // fedimint-client).
2340        //
2341        // Otherwise, if the previous database contains data and no legacy database
2342        // version, use `DatabaseVersion(0)` so that all database migrations are
2343        // run. Otherwise, this database can assumed to be new and can use
2344        // `target_db_version` to skip the database migrations.
2345        let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2346            remove_current_db_version_if_exists(
2347                &mut global_dbtx
2348                    .to_ref_with_prefix_module_id(module_instance_id)
2349                    .0
2350                    .into_nc(),
2351                is_new_db,
2352                target_db_version,
2353            )
2354            .await
2355        } else {
2356            remove_current_db_version_if_exists(
2357                &mut global_dbtx.to_ref().into_nc(),
2358                is_new_db,
2359                target_db_version,
2360            )
2361            .await
2362        };
2363
2364        // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2365        info!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2366        global_dbtx
2367            .insert_new_entry(
2368                &DatabaseVersionKey(key_module_instance_id),
2369                &current_version_in_module,
2370            )
2371            .await;
2372        global_dbtx.commit_tx_result().await?;
2373    }
2374
2375    Ok(())
2376}
2377
2378/// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2379/// returns the current database version. If the current version does not
2380/// exist, use `target_db_version` if the database is new. Otherwise, return
2381/// `DatabaseVersion(0)` to ensure all migrations are run.
2382async fn remove_current_db_version_if_exists(
2383    version_dbtx: &mut DatabaseTransaction<'_>,
2384    is_new_db: bool,
2385    target_db_version: DatabaseVersion,
2386) -> DatabaseVersion {
2387    // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2388    // exist, just use the 0 for the version so that all of the migrations are
2389    // executed.
2390    let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2391    match current_version_in_module {
2392        Some(database_version) => database_version,
2393        None if is_new_db => target_db_version,
2394        None => DatabaseVersion(0),
2395    }
2396}
2397
2398/// Helper function to retrieve the `module_instance_id` for modules, otherwise
2399/// return 0xff for the global namespace.
2400fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2401    // Use 0xff for fedimint-server and the `module_instance_id` for each module
2402    module_instance_id.map_or_else(
2403        || MODULE_GLOBAL_PREFIX.into(),
2404        |module_instance_id| module_instance_id,
2405    )
2406}
2407
2408pub struct MigrationContext<'tx> {
2409    dbtx: DatabaseTransaction<'tx>,
2410    module_instance_id: Option<ModuleInstanceId>,
2411}
2412
2413impl<'tx> MigrationContext<'tx> {
2414    pub fn dbtx(&mut self) -> DatabaseTransaction {
2415        if let Some(module_instance_id) = self.module_instance_id {
2416            self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2417        } else {
2418            self.dbtx.to_ref_nc()
2419        }
2420    }
2421
2422    pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2423        self.module_instance_id
2424    }
2425
2426    #[doc(hidden)]
2427    pub fn __global_dbtx(&mut self) -> &mut DatabaseTransaction<'tx> {
2428        &mut self.dbtx
2429    }
2430}
2431
2432#[allow(unused_imports)]
2433mod test_utils {
2434    use std::collections::BTreeMap;
2435    use std::time::Duration;
2436
2437    use fedimint_core::db::MigrationContext;
2438    use futures::future::ready;
2439    use futures::{Future, FutureExt, StreamExt};
2440    use rand::Rng;
2441    use tokio::join;
2442
2443    use super::{
2444        apply_migrations, CoreMigrationFn, Database, DatabaseTransaction, DatabaseVersion,
2445        DatabaseVersionKey, DatabaseVersionKeyV0,
2446    };
2447    use crate::core::ModuleKind;
2448    use crate::db::mem_impl::MemDatabase;
2449    use crate::db::{
2450        IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2451    };
2452    use crate::encoding::{Decodable, Encodable};
2453    use crate::module::registry::ModuleDecoderRegistry;
2454
2455    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2456        crate::runtime::timeout(Duration::from_millis(10), fut)
2457            .await
2458            .ok()
2459    }
2460
2461    #[repr(u8)]
2462    #[derive(Clone)]
2463    pub enum TestDbKeyPrefix {
2464        Test = 0x42,
2465        AltTest = 0x43,
2466        PercentTestKey = 0x25,
2467    }
2468
2469    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2470    pub(super) struct TestKey(pub u64);
2471
2472    #[derive(Debug, Encodable, Decodable)]
2473    struct DbPrefixTestPrefix;
2474
2475    impl_db_record!(
2476        key = TestKey,
2477        value = TestVal,
2478        db_prefix = TestDbKeyPrefix::Test,
2479        notify_on_modify = true,
2480    );
2481    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2482
2483    #[derive(Debug, Encodable, Decodable)]
2484    struct TestKeyV0(u64, u64);
2485
2486    #[derive(Debug, Encodable, Decodable)]
2487    struct DbPrefixTestPrefixV0;
2488
2489    impl_db_record!(
2490        key = TestKeyV0,
2491        value = TestVal,
2492        db_prefix = TestDbKeyPrefix::Test,
2493    );
2494    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2495
2496    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2497    struct AltTestKey(u64);
2498
2499    #[derive(Debug, Encodable, Decodable)]
2500    struct AltDbPrefixTestPrefix;
2501
2502    impl_db_record!(
2503        key = AltTestKey,
2504        value = TestVal,
2505        db_prefix = TestDbKeyPrefix::AltTest,
2506    );
2507    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2508
2509    #[derive(Debug, Encodable, Decodable)]
2510    struct PercentTestKey(u64);
2511
2512    #[derive(Debug, Encodable, Decodable)]
2513    struct PercentPrefixTestPrefix;
2514
2515    impl_db_record!(
2516        key = PercentTestKey,
2517        value = TestVal,
2518        db_prefix = TestDbKeyPrefix::PercentTestKey,
2519    );
2520
2521    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2522    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2523    pub(super) struct TestVal(pub u64);
2524
2525    const TEST_MODULE_PREFIX: u16 = 1;
2526    const ALT_MODULE_PREFIX: u16 = 2;
2527
2528    pub async fn verify_insert_elements(db: Database) {
2529        let mut dbtx = db.begin_transaction().await;
2530        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2531        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2532        dbtx.commit_tx().await;
2533
2534        // Test values were persisted
2535        let mut dbtx = db.begin_transaction().await;
2536        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2537        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2538        dbtx.commit_tx().await;
2539
2540        // Test overwrites work as expected
2541        let mut dbtx = db.begin_transaction().await;
2542        assert_eq!(
2543            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2544            Some(TestVal(2))
2545        );
2546        assert_eq!(
2547            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2548            Some(TestVal(3))
2549        );
2550        dbtx.commit_tx().await;
2551
2552        let mut dbtx = db.begin_transaction().await;
2553        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2554        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2555        dbtx.commit_tx().await;
2556    }
2557
2558    pub async fn verify_remove_nonexisting(db: Database) {
2559        let mut dbtx = db.begin_transaction().await;
2560        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2561        let removed = dbtx.remove_entry(&TestKey(1)).await;
2562        assert!(removed.is_none());
2563
2564        // Commit to suppress the warning message
2565        dbtx.commit_tx().await;
2566    }
2567
2568    pub async fn verify_remove_existing(db: Database) {
2569        let mut dbtx = db.begin_transaction().await;
2570
2571        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2572
2573        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2574
2575        let removed = dbtx.remove_entry(&TestKey(1)).await;
2576        assert_eq!(removed, Some(TestVal(2)));
2577        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2578
2579        // Commit to suppress the warning message
2580        dbtx.commit_tx().await;
2581    }
2582
2583    pub async fn verify_read_own_writes(db: Database) {
2584        let mut dbtx = db.begin_transaction().await;
2585
2586        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2587
2588        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2589
2590        // Commit to suppress the warning message
2591        dbtx.commit_tx().await;
2592    }
2593
2594    pub async fn verify_prevent_dirty_reads(db: Database) {
2595        let mut dbtx = db.begin_transaction().await;
2596
2597        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2598
2599        // dbtx2 should not be able to see uncommitted changes
2600        let mut dbtx2 = db.begin_transaction().await;
2601        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2602
2603        // Commit to suppress the warning message
2604        dbtx.commit_tx().await;
2605    }
2606
2607    pub async fn verify_find_by_range(db: Database) {
2608        let mut dbtx = db.begin_transaction().await;
2609        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2610        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2611        dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2612
2613        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2614        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2615
2616        {
2617            let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2618            module_dbtx
2619                .insert_entry(&TestKey(300), &TestVal(3000))
2620                .await;
2621        }
2622
2623        dbtx.commit_tx().await;
2624
2625        // Verify finding by prefix returns the correct set of key pairs
2626        let mut dbtx = db.begin_transaction_nc().await;
2627
2628        let returned_keys = dbtx
2629            .find_by_range(TestKey(55)..TestKey(56))
2630            .await
2631            .collect::<Vec<_>>()
2632            .await;
2633
2634        let expected = vec![(TestKey(55), TestVal(9999))];
2635
2636        assert_eq!(returned_keys, expected);
2637
2638        let returned_keys = dbtx
2639            .find_by_range(TestKey(54)..TestKey(56))
2640            .await
2641            .collect::<Vec<_>>()
2642            .await;
2643
2644        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2645        assert_eq!(returned_keys, expected);
2646
2647        let returned_keys = dbtx
2648            .find_by_range(TestKey(54)..TestKey(57))
2649            .await
2650            .collect::<Vec<_>>()
2651            .await;
2652
2653        let expected = vec![
2654            (TestKey(54), TestVal(8888)),
2655            (TestKey(55), TestVal(9999)),
2656            (TestKey(56), TestVal(7777)),
2657        ];
2658        assert_eq!(returned_keys, expected);
2659
2660        let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2661        let test_range = module_dbtx
2662            .find_by_range(TestKey(300)..TestKey(301))
2663            .await
2664            .collect::<Vec<_>>()
2665            .await;
2666        assert!(test_range.len() == 1);
2667    }
2668
2669    pub async fn verify_find_by_prefix(db: Database) {
2670        let mut dbtx = db.begin_transaction().await;
2671        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2672        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2673
2674        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2675        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2676        dbtx.commit_tx().await;
2677
2678        // Verify finding by prefix returns the correct set of key pairs
2679        let mut dbtx = db.begin_transaction().await;
2680
2681        let returned_keys = dbtx
2682            .find_by_prefix(&DbPrefixTestPrefix)
2683            .await
2684            .collect::<Vec<_>>()
2685            .await;
2686
2687        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2688        assert_eq!(returned_keys, expected);
2689
2690        let reversed = dbtx
2691            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2692            .await
2693            .collect::<Vec<_>>()
2694            .await;
2695        let mut reversed_expected = expected;
2696        reversed_expected.reverse();
2697        assert_eq!(reversed, reversed_expected);
2698
2699        let returned_keys = dbtx
2700            .find_by_prefix(&AltDbPrefixTestPrefix)
2701            .await
2702            .collect::<Vec<_>>()
2703            .await;
2704
2705        let expected = vec![
2706            (AltTestKey(54), TestVal(6666)),
2707            (AltTestKey(55), TestVal(7777)),
2708        ];
2709        assert_eq!(returned_keys, expected);
2710
2711        let reversed = dbtx
2712            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2713            .await
2714            .collect::<Vec<_>>()
2715            .await;
2716        let mut reversed_expected = expected;
2717        reversed_expected.reverse();
2718        assert_eq!(reversed, reversed_expected);
2719    }
2720
2721    pub async fn verify_commit(db: Database) {
2722        let mut dbtx = db.begin_transaction().await;
2723
2724        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2725        dbtx.commit_tx().await;
2726
2727        // Verify dbtx2 can see committed transactions
2728        let mut dbtx2 = db.begin_transaction().await;
2729        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2730    }
2731
2732    pub async fn verify_rollback_to_savepoint(db: Database) {
2733        let mut dbtx_rollback = db.begin_transaction().await;
2734
2735        dbtx_rollback
2736            .insert_entry(&TestKey(20), &TestVal(2000))
2737            .await;
2738
2739        dbtx_rollback
2740            .set_tx_savepoint()
2741            .await
2742            .expect("Error setting transaction savepoint");
2743
2744        dbtx_rollback
2745            .insert_entry(&TestKey(21), &TestVal(2001))
2746            .await;
2747
2748        assert_eq!(
2749            dbtx_rollback.get_value(&TestKey(20)).await,
2750            Some(TestVal(2000))
2751        );
2752        assert_eq!(
2753            dbtx_rollback.get_value(&TestKey(21)).await,
2754            Some(TestVal(2001))
2755        );
2756
2757        dbtx_rollback
2758            .rollback_tx_to_savepoint()
2759            .await
2760            .expect("Error setting transaction savepoint");
2761
2762        assert_eq!(
2763            dbtx_rollback.get_value(&TestKey(20)).await,
2764            Some(TestVal(2000))
2765        );
2766
2767        assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2768
2769        // Commit to suppress the warning message
2770        dbtx_rollback.commit_tx().await;
2771    }
2772
2773    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2774        let mut dbtx = db.begin_transaction().await;
2775        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2776
2777        let mut dbtx2 = db.begin_transaction().await;
2778
2779        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2780
2781        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2782
2783        dbtx2.commit_tx().await;
2784
2785        // dbtx should still read None because it is operating over a snapshot
2786        // of the data when the transaction started
2787        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2788
2789        let expected_keys = 0;
2790        let returned_keys = dbtx
2791            .find_by_prefix(&DbPrefixTestPrefix)
2792            .await
2793            .fold(0, |returned_keys, (key, value)| async move {
2794                if key == TestKey(100) {
2795                    assert!(value.eq(&TestVal(101)));
2796                }
2797                returned_keys + 1
2798            })
2799            .await;
2800
2801        assert_eq!(returned_keys, expected_keys);
2802    }
2803
2804    pub async fn verify_snapshot_isolation(db: Database) {
2805        async fn random_yield() {
2806            let times = if rand::thread_rng().gen_bool(0.5) {
2807                0
2808            } else {
2809                10
2810            };
2811            for _ in 0..times {
2812                tokio::task::yield_now().await;
2813            }
2814        }
2815
2816        // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
2817        for i in 0..1000 {
2818            let base_key = i * 2;
2819            let tx_accepted_key = base_key;
2820            let spent_input_key = base_key + 1;
2821
2822            join!(
2823                async {
2824                    random_yield().await;
2825                    let mut dbtx = db.begin_transaction().await;
2826
2827                    random_yield().await;
2828                    let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2829                    random_yield().await;
2830                    // we have 4 operations that can give you the db key,
2831                    // try all of them
2832                    let s = match i % 5 {
2833                        0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2834                        1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2835                        2 => {
2836                            dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2837                                .await
2838                        }
2839                        3 => {
2840                            dbtx.find_by_prefix(&DbPrefixTestPrefix)
2841                                .await
2842                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2843                                .map(|(_k, v)| v)
2844                                .next()
2845                                .await
2846                        }
2847                        4 => {
2848                            dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2849                                .await
2850                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2851                                .map(|(_k, v)| v)
2852                                .next()
2853                                .await
2854                        }
2855                        _ => {
2856                            panic!("woot?");
2857                        }
2858                    };
2859
2860                    match (a, s) {
2861                        (None, None) | (Some(_), Some(_)) => {}
2862                        (None, Some(_)) => panic!("none some?! {i}"),
2863                        (Some(_), None) => panic!("some none?! {i}"),
2864                    }
2865                },
2866                async {
2867                    random_yield().await;
2868
2869                    let mut dbtx = db.begin_transaction().await;
2870                    random_yield().await;
2871                    assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2872
2873                    random_yield().await;
2874                    assert_eq!(
2875                        dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2876                            .await,
2877                        None
2878                    );
2879
2880                    random_yield().await;
2881                    assert_eq!(
2882                        dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2883                            .await,
2884                        None
2885                    );
2886                    random_yield().await;
2887                    dbtx.commit_tx().await;
2888                }
2889            );
2890        }
2891    }
2892
2893    pub async fn verify_phantom_entry(db: Database) {
2894        let mut dbtx = db.begin_transaction().await;
2895
2896        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2897
2898        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2899
2900        dbtx.commit_tx().await;
2901
2902        let mut dbtx = db.begin_transaction().await;
2903        let expected_keys = 2;
2904        let returned_keys = dbtx
2905            .find_by_prefix(&DbPrefixTestPrefix)
2906            .await
2907            .fold(0, |returned_keys, (key, value)| async move {
2908                match key {
2909                    TestKey(100) => {
2910                        assert!(value.eq(&TestVal(101)));
2911                    }
2912                    TestKey(101) => {
2913                        assert!(value.eq(&TestVal(102)));
2914                    }
2915                    _ => {}
2916                };
2917                returned_keys + 1
2918            })
2919            .await;
2920
2921        assert_eq!(returned_keys, expected_keys);
2922
2923        let mut dbtx2 = db.begin_transaction().await;
2924
2925        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2926
2927        dbtx2.commit_tx().await;
2928
2929        let returned_keys = dbtx
2930            .find_by_prefix(&DbPrefixTestPrefix)
2931            .await
2932            .fold(0, |returned_keys, (key, value)| async move {
2933                match key {
2934                    TestKey(100) => {
2935                        assert!(value.eq(&TestVal(101)));
2936                    }
2937                    TestKey(101) => {
2938                        assert!(value.eq(&TestVal(102)));
2939                    }
2940                    _ => {}
2941                };
2942                returned_keys + 1
2943            })
2944            .await;
2945
2946        assert_eq!(returned_keys, expected_keys);
2947    }
2948
2949    pub async fn expect_write_conflict(db: Database) {
2950        let mut dbtx = db.begin_transaction().await;
2951        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2952        dbtx.commit_tx().await;
2953
2954        let mut dbtx2 = db.begin_transaction().await;
2955        let mut dbtx3 = db.begin_transaction().await;
2956
2957        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2958
2959        // Depending on if the database implementation supports optimistic or
2960        // pessimistic transactions, this test should generate an error here
2961        // (pessimistic) or at commit time (optimistic)
2962        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
2963
2964        dbtx2.commit_tx().await;
2965        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
2966    }
2967
2968    pub async fn verify_string_prefix(db: Database) {
2969        let mut dbtx = db.begin_transaction().await;
2970        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
2971
2972        assert_eq!(
2973            dbtx.get_value(&PercentTestKey(100)).await,
2974            Some(TestVal(101))
2975        );
2976
2977        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2978
2979        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2980
2981        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2982
2983        // If the wildcard character ('%') is not handled properly, this will make
2984        // find_by_prefix return 5 results instead of 4
2985        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
2986
2987        let expected_keys = 4;
2988        let returned_keys = dbtx
2989            .find_by_prefix(&PercentPrefixTestPrefix)
2990            .await
2991            .fold(0, |returned_keys, (key, value)| async move {
2992                if matches!(key, PercentTestKey(101)) {
2993                    assert!(value.eq(&TestVal(100)));
2994                }
2995                returned_keys + 1
2996            })
2997            .await;
2998
2999        assert_eq!(returned_keys, expected_keys);
3000    }
3001
3002    pub async fn verify_remove_by_prefix(db: Database) {
3003        let mut dbtx = db.begin_transaction().await;
3004
3005        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3006
3007        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3008
3009        dbtx.commit_tx().await;
3010
3011        let mut remove_dbtx = db.begin_transaction().await;
3012        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3013        remove_dbtx.commit_tx().await;
3014
3015        let mut dbtx = db.begin_transaction().await;
3016        let expected_keys = 0;
3017        let returned_keys = dbtx
3018            .find_by_prefix(&DbPrefixTestPrefix)
3019            .await
3020            .fold(0, |returned_keys, (key, value)| async move {
3021                match key {
3022                    TestKey(100) => {
3023                        assert!(value.eq(&TestVal(101)));
3024                    }
3025                    TestKey(101) => {
3026                        assert!(value.eq(&TestVal(102)));
3027                    }
3028                    _ => {}
3029                };
3030                returned_keys + 1
3031            })
3032            .await;
3033
3034        assert_eq!(returned_keys, expected_keys);
3035    }
3036
3037    pub async fn verify_module_db(db: Database, module_db: Database) {
3038        let mut dbtx = db.begin_transaction().await;
3039
3040        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3041
3042        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3043
3044        dbtx.commit_tx().await;
3045
3046        // verify module_dbtx can only read key/value pairs from its own module
3047        let mut module_dbtx = module_db.begin_transaction().await;
3048        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3049
3050        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3051
3052        // verify module_dbtx can read key/value pairs that it wrote
3053        let mut dbtx = db.begin_transaction().await;
3054        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3055
3056        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3057
3058        let mut module_dbtx = module_db.begin_transaction().await;
3059
3060        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3061
3062        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3063
3064        module_dbtx.commit_tx().await;
3065
3066        let expected_keys = 2;
3067        let mut dbtx = db.begin_transaction().await;
3068        let returned_keys = dbtx
3069            .find_by_prefix(&DbPrefixTestPrefix)
3070            .await
3071            .fold(0, |returned_keys, (key, value)| async move {
3072                match key {
3073                    TestKey(100) => {
3074                        assert!(value.eq(&TestVal(101)));
3075                    }
3076                    TestKey(101) => {
3077                        assert!(value.eq(&TestVal(102)));
3078                    }
3079                    _ => {}
3080                };
3081                returned_keys + 1
3082            })
3083            .await;
3084
3085        assert_eq!(returned_keys, expected_keys);
3086
3087        let removed = dbtx.remove_entry(&TestKey(100)).await;
3088        assert_eq!(removed, Some(TestVal(101)));
3089        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3090
3091        let mut module_dbtx = module_db.begin_transaction().await;
3092        assert_eq!(
3093            module_dbtx.get_value(&TestKey(100)).await,
3094            Some(TestVal(103))
3095        );
3096    }
3097
3098    pub async fn verify_module_prefix(db: Database) {
3099        let mut test_dbtx = db.begin_transaction().await;
3100        {
3101            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3102
3103            test_module_dbtx
3104                .insert_entry(&TestKey(100), &TestVal(101))
3105                .await;
3106
3107            test_module_dbtx
3108                .insert_entry(&TestKey(101), &TestVal(102))
3109                .await;
3110        }
3111
3112        test_dbtx.commit_tx().await;
3113
3114        let mut alt_dbtx = db.begin_transaction().await;
3115        {
3116            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3117
3118            alt_module_dbtx
3119                .insert_entry(&TestKey(100), &TestVal(103))
3120                .await;
3121
3122            alt_module_dbtx
3123                .insert_entry(&TestKey(101), &TestVal(104))
3124                .await;
3125        }
3126
3127        alt_dbtx.commit_tx().await;
3128
3129        // verify test_module_dbtx can only see key/value pairs from its own module
3130        let mut test_dbtx = db.begin_transaction().await;
3131        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3132        assert_eq!(
3133            test_module_dbtx.get_value(&TestKey(100)).await,
3134            Some(TestVal(101))
3135        );
3136
3137        assert_eq!(
3138            test_module_dbtx.get_value(&TestKey(101)).await,
3139            Some(TestVal(102))
3140        );
3141
3142        let expected_keys = 2;
3143        let returned_keys = test_module_dbtx
3144            .find_by_prefix(&DbPrefixTestPrefix)
3145            .await
3146            .fold(0, |returned_keys, (key, value)| async move {
3147                match key {
3148                    TestKey(100) => {
3149                        assert!(value.eq(&TestVal(101)));
3150                    }
3151                    TestKey(101) => {
3152                        assert!(value.eq(&TestVal(102)));
3153                    }
3154                    _ => {}
3155                };
3156                returned_keys + 1
3157            })
3158            .await;
3159
3160        assert_eq!(returned_keys, expected_keys);
3161
3162        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3163        assert_eq!(removed, Some(TestVal(101)));
3164        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3165
3166        // test_dbtx on its own wont find the key because it does not use a module
3167        // prefix
3168        let mut test_dbtx = db.begin_transaction().await;
3169        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3170
3171        test_dbtx.commit_tx().await;
3172    }
3173
3174    #[cfg(test)]
3175    #[tokio::test]
3176    pub async fn verify_test_migration() {
3177        // Insert a bunch of old dummy data that needs to be migrated to a new version
3178        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3179        let expected_test_keys_size: usize = 100;
3180        let mut dbtx = db.begin_transaction().await;
3181        for i in 0..expected_test_keys_size {
3182            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3183                .await;
3184        }
3185
3186        // Will also be migrated to `DatabaseVersionKey`
3187        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3188            .await;
3189        dbtx.commit_tx().await;
3190
3191        let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
3192
3193        migrations.insert(DatabaseVersion(0), |ctx| {
3194            migrate_test_db_version_0(ctx).boxed()
3195        });
3196
3197        apply_migrations(&db, "TestModule".to_string(), migrations, None, None)
3198            .await
3199            .expect("Error applying migrations for TestModule");
3200
3201        // Verify that the migrations completed successfully
3202        let mut dbtx = db.begin_transaction().await;
3203
3204        // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
3205        // to `DatabaseVersionKey`
3206        assert!(dbtx
3207            .get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3208            .await
3209            .is_some());
3210
3211        // Verify Dummy module migration
3212        let test_keys = dbtx
3213            .find_by_prefix(&DbPrefixTestPrefix)
3214            .await
3215            .collect::<Vec<_>>()
3216            .await;
3217        let test_keys_size = test_keys.len();
3218        assert_eq!(test_keys_size, expected_test_keys_size);
3219        for (key, val) in test_keys {
3220            assert_eq!(key.0, val.0 + 1);
3221        }
3222    }
3223
3224    #[allow(dead_code)]
3225    async fn migrate_test_db_version_0(mut ctx: MigrationContext<'_>) -> Result<(), anyhow::Error> {
3226        let mut dbtx = ctx.dbtx();
3227        let example_keys_v0 = dbtx
3228            .find_by_prefix(&DbPrefixTestPrefixV0)
3229            .await
3230            .collect::<Vec<_>>()
3231            .await;
3232        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3233        for (key, val) in example_keys_v0 {
3234            let key_v2 = TestKey(key.1);
3235            dbtx.insert_new_entry(&key_v2, &val).await;
3236        }
3237        Ok(())
3238    }
3239
3240    #[cfg(test)]
3241    #[tokio::test]
3242    async fn test_autocommit() {
3243        use std::marker::PhantomData;
3244        use std::ops::Range;
3245        use std::path::Path;
3246
3247        use anyhow::anyhow;
3248        use async_trait::async_trait;
3249
3250        use crate::db::{
3251            AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3252            IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3253            IRawDatabaseTransaction,
3254        };
3255        use crate::ModuleDecoderRegistry;
3256
3257        #[derive(Debug)]
3258        struct FakeDatabase;
3259
3260        #[async_trait]
3261        impl IRawDatabase for FakeDatabase {
3262            type Transaction<'a> = FakeTransaction<'a>;
3263            async fn begin_transaction(&self) -> FakeTransaction {
3264                FakeTransaction(PhantomData)
3265            }
3266
3267            fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3268                Ok(())
3269            }
3270        }
3271
3272        #[derive(Debug)]
3273        struct FakeTransaction<'a>(PhantomData<&'a ()>);
3274
3275        #[async_trait]
3276        impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3277            async fn raw_insert_bytes(
3278                &mut self,
3279                _key: &[u8],
3280                _value: &[u8],
3281            ) -> anyhow::Result<Option<Vec<u8>>> {
3282                unimplemented!()
3283            }
3284
3285            async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3286                unimplemented!()
3287            }
3288
3289            async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3290                unimplemented!()
3291            }
3292
3293            async fn raw_find_by_range(
3294                &mut self,
3295                _key_range: Range<&[u8]>,
3296            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3297                unimplemented!()
3298            }
3299
3300            async fn raw_find_by_prefix(
3301                &mut self,
3302                _key_prefix: &[u8],
3303            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3304                unimplemented!()
3305            }
3306
3307            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3308                unimplemented!()
3309            }
3310
3311            async fn raw_find_by_prefix_sorted_descending(
3312                &mut self,
3313                _key_prefix: &[u8],
3314            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3315                unimplemented!()
3316            }
3317        }
3318
3319        #[async_trait]
3320        impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3321            async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3322                unimplemented!()
3323            }
3324
3325            async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3326                unimplemented!()
3327            }
3328        }
3329
3330        #[async_trait]
3331        impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3332            async fn commit_tx(self) -> anyhow::Result<()> {
3333                Err(anyhow!("Can't commit!"))
3334            }
3335        }
3336
3337        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3338        let err = db
3339            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3340            .await
3341            .unwrap_err();
3342
3343        match err {
3344            AutocommitError::CommitFailed {
3345                attempts: failed_attempts,
3346                ..
3347            } => {
3348                assert_eq!(failed_attempts, 5);
3349            }
3350            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3351        }
3352    }
3353}
3354
3355pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3356    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3357    decoders: ModuleDecoderRegistry,
3358    key_prefix: &KP,
3359) -> impl Stream<
3360    Item = (
3361        KP::Record,
3362        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3363    ),
3364> + 'r
3365where
3366    'inner: 'r,
3367    KP: DatabaseLookup,
3368    KP::Record: DatabaseKey,
3369{
3370    debug!("find by prefix sorted descending");
3371    let prefix_bytes = key_prefix.to_bytes();
3372    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3373        .await
3374        .expect("Error doing prefix search in database")
3375        .map(move |(key_bytes, value_bytes)| {
3376            let key = decode_key_expect(&key_bytes, &decoders);
3377            let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3378            (key, value)
3379        })
3380}
3381
3382#[cfg(test)]
3383mod tests {
3384    use tokio::sync::oneshot;
3385
3386    use super::mem_impl::MemDatabase;
3387    use super::*;
3388    use crate::runtime::spawn;
3389
3390    async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> {
3391        let db = db.clone();
3392        let (tx, rx) = oneshot::channel::<()>();
3393        let join_handle = spawn("wait key exists", async move {
3394            let sub = db.wait_key_exists(&key);
3395            tx.send(()).unwrap();
3396            sub.await
3397        });
3398        rx.await.unwrap();
3399        join_handle
3400    }
3401
3402    #[tokio::test]
3403    async fn test_wait_key_before_transaction() {
3404        let key = TestKey(1);
3405        let val = TestVal(2);
3406        let db = MemDatabase::new().into_database();
3407
3408        let key_task = waiter(&db, TestKey(1)).await;
3409
3410        let mut tx = db.begin_transaction().await;
3411        tx.insert_new_entry(&key, &val).await;
3412        tx.commit_tx().await;
3413
3414        assert_eq!(
3415            future_returns_shortly(async { key_task.await.unwrap() }).await,
3416            Some(TestVal(2)),
3417            "should notify"
3418        );
3419    }
3420
3421    #[tokio::test]
3422    async fn test_wait_key_before_insert() {
3423        let key = TestKey(1);
3424        let val = TestVal(2);
3425        let db = MemDatabase::new().into_database();
3426
3427        let mut tx = db.begin_transaction().await;
3428        let key_task = waiter(&db, TestKey(1)).await;
3429        tx.insert_new_entry(&key, &val).await;
3430        tx.commit_tx().await;
3431
3432        assert_eq!(
3433            future_returns_shortly(async { key_task.await.unwrap() }).await,
3434            Some(TestVal(2)),
3435            "should notify"
3436        );
3437    }
3438
3439    #[tokio::test]
3440    async fn test_wait_key_after_insert() {
3441        let key = TestKey(1);
3442        let val = TestVal(2);
3443        let db = MemDatabase::new().into_database();
3444
3445        let mut tx = db.begin_transaction().await;
3446        tx.insert_new_entry(&key, &val).await;
3447
3448        let key_task = waiter(&db, TestKey(1)).await;
3449
3450        tx.commit_tx().await;
3451
3452        assert_eq!(
3453            future_returns_shortly(async { key_task.await.unwrap() }).await,
3454            Some(TestVal(2)),
3455            "should notify"
3456        );
3457    }
3458
3459    #[tokio::test]
3460    async fn test_wait_key_after_commit() {
3461        let key = TestKey(1);
3462        let val = TestVal(2);
3463        let db = MemDatabase::new().into_database();
3464
3465        let mut tx = db.begin_transaction().await;
3466        tx.insert_new_entry(&key, &val).await;
3467        tx.commit_tx().await;
3468
3469        let key_task = waiter(&db, TestKey(1)).await;
3470        assert_eq!(
3471            future_returns_shortly(async { key_task.await.unwrap() }).await,
3472            Some(TestVal(2)),
3473            "should notify"
3474        );
3475    }
3476
3477    #[tokio::test]
3478    async fn test_wait_key_isolated_db() {
3479        let module_instance_id = 10;
3480        let key = TestKey(1);
3481        let val = TestVal(2);
3482        let db = MemDatabase::new().into_database();
3483        let db = db.with_prefix_module_id(module_instance_id).0;
3484
3485        let key_task = waiter(&db, TestKey(1)).await;
3486
3487        let mut tx = db.begin_transaction().await;
3488        tx.insert_new_entry(&key, &val).await;
3489        tx.commit_tx().await;
3490
3491        assert_eq!(
3492            future_returns_shortly(async { key_task.await.unwrap() }).await,
3493            Some(TestVal(2)),
3494            "should notify"
3495        );
3496    }
3497
3498    #[tokio::test]
3499    async fn test_wait_key_isolated_tx() {
3500        let module_instance_id = 10;
3501        let key = TestKey(1);
3502        let val = TestVal(2);
3503        let db = MemDatabase::new().into_database();
3504
3505        let key_task = waiter(&db.with_prefix_module_id(module_instance_id).0, TestKey(1)).await;
3506
3507        let mut tx = db.begin_transaction().await;
3508        let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id).0;
3509        tx_mod.insert_new_entry(&key, &val).await;
3510        drop(tx_mod);
3511        tx.commit_tx().await;
3512
3513        assert_eq!(
3514            future_returns_shortly(async { key_task.await.unwrap() }).await,
3515            Some(TestVal(2)),
3516            "should notify"
3517        );
3518    }
3519
3520    #[tokio::test]
3521    async fn test_wait_key_no_transaction() {
3522        let db = MemDatabase::new().into_database();
3523
3524        let key_task = waiter(&db, TestKey(1)).await;
3525        assert_eq!(
3526            future_returns_shortly(async { key_task.await.unwrap() }).await,
3527            None,
3528            "should not notify"
3529        );
3530    }
3531
3532    #[tokio::test]
3533    async fn test_prefix_global_dbtx() {
3534        let module_instance_id = 10;
3535        let db = MemDatabase::new().into_database();
3536
3537        {
3538            // Plain module id prefix, can use `global_dbtx` to access global_dbtx
3539            let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3540
3541            let mut tx = db.begin_transaction().await;
3542            let mut tx = tx.global_dbtx(access_token);
3543            tx.insert_new_entry(&TestKey(1), &TestVal(1)).await;
3544            tx.commit_tx().await;
3545        }
3546
3547        assert_eq!(
3548            db.begin_transaction_nc().await.get_value(&TestKey(1)).await,
3549            Some(TestVal(1))
3550        );
3551
3552        {
3553            // Additional non-module inner prefix, does not interfere with `global_dbtx`
3554            let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3555
3556            let db = db.with_prefix(vec![3, 4]);
3557
3558            let mut tx = db.begin_transaction().await;
3559            let mut tx = tx.global_dbtx(access_token);
3560            tx.insert_new_entry(&TestKey(2), &TestVal(2)).await;
3561            tx.commit_tx().await;
3562        }
3563
3564        assert_eq!(
3565            db.begin_transaction_nc().await.get_value(&TestKey(2)).await,
3566            Some(TestVal(2))
3567        );
3568    }
3569
3570    #[tokio::test]
3571    #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3572    async fn test_prefix_global_dbtx_panics_on_global_db() {
3573        let db = MemDatabase::new().into_database();
3574
3575        let mut tx = db.begin_transaction().await;
3576        let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3577    }
3578
3579    #[tokio::test]
3580    #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3581    async fn test_prefix_global_dbtx_panics_on_non_module_prefix() {
3582        let db = MemDatabase::new().into_database();
3583
3584        let prefix = vec![3, 4];
3585        let db = db.with_prefix(prefix.clone());
3586
3587        let mut tx = db.begin_transaction().await;
3588        let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&prefix));
3589    }
3590
3591    #[tokio::test]
3592    #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3593    async fn test_prefix_global_dbtx_panics_on_wrong_access_token() {
3594        let db = MemDatabase::new().into_database();
3595
3596        let prefix = vec![3, 4];
3597        let db = db.with_prefix(prefix.clone());
3598
3599        let mut tx = db.begin_transaction().await;
3600        let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3601    }
3602}