fedimint_core/db/
mod.rs

1//! Core Fedimint database traits and types
2//!
3//! This module provides the core key-value database for Fedimint.
4//!
5//! # Usage
6//!
7//! To use the database, you typically follow these steps:
8//!
9//! 1. Create a `Database` instance
10//! 2. Begin a transaction
11//! 3. Perform operations within the transaction
12//! 4. Commit the transaction
13//!
14//! ## Example
15//!
16//! ```rust
17//! use fedimint_core::db::mem_impl::MemDatabase;
18//! use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19//! use fedimint_core::encoding::{Decodable, Encodable};
20//! use fedimint_core::impl_db_record;
21//! use fedimint_core::module::registry::ModuleDecoderRegistry;
22//!
23//! #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
24//! pub struct TestKey(pub u64);
25//! #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
26//! pub struct TestVal(pub u64);
27//!
28//! #[repr(u8)]
29//! #[derive(Clone)]
30//! pub enum TestDbKeyPrefix {
31//!     Test = 0x42,
32//! }
33//!
34//! impl_db_record!(
35//!     key = TestKey,
36//!     value = TestVal,
37//!     db_prefix = TestDbKeyPrefix::Test,
38//! );
39//!
40//! # async fn example() {
41//! // Create a new in-memory database
42//! let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
43//!
44//! // Begin a transaction
45//! let mut tx = db.begin_transaction().await;
46//!
47//! // Perform operations
48//! tx.insert_entry(&TestKey(1), &TestVal(100)).await;
49//! let value = tx.get_value(&TestKey(1)).await;
50//!
51//! // Commit the transaction
52//! tx.commit_tx().await;
53//!
54//! // For operations that may need to be retried due to conflicts, use the
55//! // `autocommit` function:
56//!
57//! db.autocommit(
58//!     |dbtx, _| {
59//!         Box::pin(async move {
60//!             dbtx.insert_entry(&TestKey(1), &TestVal(100)).await;
61//!             anyhow::Ok(())
62//!         })
63//!     },
64//!     None,
65//! )
66//! .await
67//! .unwrap();
68//! # }
69//! ```
70//!
71//! # Isolation of database transactions
72//!
73//! Fedimint requires that the database implementation implement Snapshot
74//! Isolation. Snapshot Isolation is a database isolation level that guarantees
75//! consistent reads from the time that the snapshot was created (at transaction
76//! creation time). Transactions with Snapshot Isolation level will only commit
77//! if there has been no write to the modified keys since the snapshot (i.e.
78//! write-write conflicts are prevented).
79//!
80//! Specifically, Fedimint expects the database implementation to prevent the
81//! following anomalies:
82//!
83//! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
84//! at time (t + i)
85//!
86//! Dirty Read: TX1 is able to read TX2's uncommitted writes.
87//!
88//! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
89//! time (t + i) where V1 != V2.
90//!
91//! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
92//! retrieves Y number of records for the same prefix at time (t + i).
93//!
94//! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
95//! overwrites V1 as the value for K1 (write-write conflict).
96//!
97//! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
98//! Record | Lost Writes | | -------- | ------------------ | ---------- |
99//! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
100//! | Prevented  | Prevented           | Prevented      | Possible    |
101//! | RocksDB  | Prevented          | Prevented  | Prevented           |
102//! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
103//! | Prevented           | Prevented      | Prevented   |
104
105use std::any;
106use std::collections::BTreeMap;
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{bail, Context, Result};
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use futures::{Stream, StreamExt};
120use macro_rules_attribute::apply;
121use rand::Rng;
122use serde::Serialize;
123use strum_macros::EnumIter;
124use thiserror::Error;
125use tracing::{debug, error, info, instrument, trace, warn};
126
127use crate::core::ModuleInstanceId;
128use crate::encoding::{Decodable, Encodable};
129use crate::fmt_utils::AbbreviateHexBytes;
130use crate::task::{MaybeSend, MaybeSync};
131use crate::util::FmtCompactAnyhow as _;
132use crate::{async_trait_maybe_send, maybe_add_send, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144pub trait DatabaseKeyPrefix: Debug {
145    fn to_bytes(&self) -> Vec<u8>;
146}
147
148/// A key + value pair in the database with a unique prefix
149/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
150pub trait DatabaseRecord: DatabaseKeyPrefix {
151    const DB_PREFIX: u8;
152    const NOTIFY_ON_MODIFY: bool = false;
153    type Key: DatabaseKey + Debug;
154    type Value: DatabaseValue + Debug;
155}
156
157/// A key that can be used to query one or more `DatabaseRecord`
158/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
159pub trait DatabaseLookup: DatabaseKeyPrefix {
160    type Record: DatabaseRecord;
161}
162
163// Every `DatabaseRecord` is automatically a `DatabaseLookup`
164impl<Record> DatabaseLookup for Record
165where
166    Record: DatabaseRecord + Debug + Decodable + Encodable,
167{
168    type Record = Record;
169}
170
171/// `DatabaseKey` that represents the lookup structure for retrieving key/value
172/// pairs from the database.
173pub trait DatabaseKey: Sized {
174    /// Send a notification to tasks waiting to be notified if the value of
175    /// `DatabaseKey` is modified
176    ///
177    /// For instance, this can be used to be notified when a key in the
178    /// database is created. It is also possible to run a closure with the
179    /// value of the `DatabaseKey` as parameter to verify some changes to
180    /// that value.
181    const NOTIFY_ON_MODIFY: bool = false;
182    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
183}
184
185/// Marker trait for `DatabaseKey`s where `NOTIFY` is true
186pub trait DatabaseKeyWithNotify {}
187
188/// `DatabaseValue` that represents the value structure of database records.
189pub trait DatabaseValue: Sized + Debug {
190    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
191    fn to_bytes(&self) -> Vec<u8>;
192}
193
194pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
195
196/// Just ignore this type, it's only there to make compiler happy
197///
198/// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
199pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
200
201/// Error returned when the autocommit function fails
202#[derive(Debug, Error)]
203pub enum AutocommitError<E> {
204    /// Committing the transaction failed too many times, giving up
205    #[error("Commit Failed: {last_error}")]
206    CommitFailed {
207        /// Number of attempts
208        attempts: usize,
209        /// Last error on commit
210        last_error: anyhow::Error,
211    },
212    /// Error returned by the closure provided to `autocommit`. If returned no
213    /// commit was attempted in that round
214    #[error("Closure error: {error}")]
215    ClosureError {
216        /// The attempt on which the closure returned an error
217        ///
218        /// Values other than 0 typically indicate a logic error since the
219        /// closure given to `autocommit` should not have side effects
220        /// and thus keep succeeding if it succeeded once.
221        attempts: usize,
222        /// Error returned by the closure
223        error: E,
224    },
225}
226
227/// Raw database implementation
228///
229/// This and [`IRawDatabaseTransaction`] are meant to be implemented
230/// by crates like `fedimint-rocksdb` to provide a concrete implementation
231/// of a database to be used by Fedimint.
232///
233/// This is in contrast of [`IDatabase`] which includes extra
234/// functionality that Fedimint needs (and adds) on top of it.
235#[apply(async_trait_maybe_send!)]
236pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
237    /// A raw database transaction type
238    type Transaction<'a>: IRawDatabaseTransaction + Debug;
239
240    /// Start a database transaction
241    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
242
243    // Checkpoint the database to a backup directory
244    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
245}
246
247#[apply(async_trait_maybe_send!)]
248impl<T> IRawDatabase for Box<T>
249where
250    T: IRawDatabase,
251{
252    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
253
254    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
255        (**self).begin_transaction().await
256    }
257
258    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
259        (**self).checkpoint(backup_path)
260    }
261}
262
263/// An extension trait with convenience operations on [`IRawDatabase`]
264pub trait IRawDatabaseExt: IRawDatabase + Sized {
265    /// Convert to type implementing [`IRawDatabase`] into [`Database`].
266    ///
267    /// When type inference is not an issue, [`Into::into`] can be used instead.
268    fn into_database(self) -> Database {
269        Database::new(self, ModuleRegistry::default())
270    }
271}
272
273impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
274
275impl<T> From<T> for Database
276where
277    T: IRawDatabase,
278{
279    fn from(raw: T) -> Self {
280        Self::new(raw, ModuleRegistry::default())
281    }
282}
283
284/// A database that on top of a raw database operation, implements
285/// key notification system.
286#[apply(async_trait_maybe_send!)]
287pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
288    /// Start a database transaction
289    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
290    /// Register (and wait) for `key` updates
291    async fn register(&self, key: &[u8]);
292    /// Notify about `key` update (creation, modification, deletion)
293    async fn notify(&self, key: &[u8]);
294
295    /// The prefix len of this database refers to the global (as opposed to
296    /// module-isolated) key space
297    fn is_global(&self) -> bool;
298
299    /// Checkpoints the database to a backup directory
300    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
301}
302
303#[apply(async_trait_maybe_send!)]
304impl<T> IDatabase for Arc<T>
305where
306    T: IDatabase + ?Sized,
307{
308    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
309        (**self).begin_transaction().await
310    }
311    async fn register(&self, key: &[u8]) {
312        (**self).register(key).await;
313    }
314    async fn notify(&self, key: &[u8]) {
315        (**self).notify(key).await;
316    }
317
318    fn is_global(&self) -> bool {
319        (**self).is_global()
320    }
321
322    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
323        (**self).checkpoint(backup_path)
324    }
325}
326
327/// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
328///
329/// Mostly notification system, but also run-time single-commit handling.
330struct BaseDatabase<RawDatabase> {
331    notifications: Arc<Notifications>,
332    raw: RawDatabase,
333}
334
335impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
336    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337        f.write_str("BaseDatabase")
338    }
339}
340
341#[apply(async_trait_maybe_send!)]
342impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
343    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
344        Box::new(BaseDatabaseTransaction::new(
345            self.raw.begin_transaction().await,
346            self.notifications.clone(),
347        ))
348    }
349    async fn register(&self, key: &[u8]) {
350        self.notifications.register(key).await;
351    }
352    async fn notify(&self, key: &[u8]) {
353        self.notifications.notify(key);
354    }
355
356    fn is_global(&self) -> bool {
357        true
358    }
359
360    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
361        self.raw.checkpoint(backup_path)
362    }
363}
364
365/// A public-facing newtype over `IDatabase`
366///
367/// Notably carries set of module decoders (`ModuleDecoderRegistry`)
368/// and implements common utility function for auto-commits, db isolation,
369/// and other.
370#[derive(Clone, Debug)]
371pub struct Database {
372    inner: Arc<dyn IDatabase + 'static>,
373    module_decoders: ModuleDecoderRegistry,
374}
375
376impl Database {
377    pub fn strong_count(&self) -> usize {
378        Arc::strong_count(&self.inner)
379    }
380
381    pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
382        self.inner
383    }
384}
385
386impl Database {
387    /// Creates a new Fedimint database from any object implementing
388    /// [`IDatabase`].
389    ///
390    /// See also [`Database::new_from_arc`].
391    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
392        let inner = BaseDatabase {
393            raw,
394            notifications: Arc::new(Notifications::new()),
395        };
396        Self::new_from_arc(
397            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
398            module_decoders,
399        )
400    }
401
402    /// Create [`Database`] from an already typed-erased `IDatabase`.
403    pub fn new_from_arc(
404        inner: Arc<dyn IDatabase + 'static>,
405        module_decoders: ModuleDecoderRegistry,
406    ) -> Self {
407        Self {
408            inner,
409            module_decoders,
410        }
411    }
412
413    /// Create [`Database`] isolated to a partition with a given `prefix`
414    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
415        Self {
416            inner: Arc::new(PrefixDatabase {
417                inner: self.inner.clone(),
418                global_dbtx_access_token: None,
419                prefix,
420            }),
421            module_decoders: self.module_decoders.clone(),
422        }
423    }
424
425    /// Create [`Database`] isolated to a partition with a prefix for a given
426    /// `module_instance_id`, allowing the module to access `global_dbtx` with
427    /// the right `access_token`
428    pub fn with_prefix_module_id(
429        &self,
430        module_instance_id: ModuleInstanceId,
431    ) -> (Self, GlobalDBTxAccessToken) {
432        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
433        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
434        (
435            Self {
436                inner: Arc::new(PrefixDatabase {
437                    inner: self.inner.clone(),
438                    global_dbtx_access_token: Some(global_dbtx_access_token),
439                    prefix,
440                }),
441                module_decoders: self.module_decoders.clone(),
442            },
443            global_dbtx_access_token,
444        )
445    }
446
447    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
448        Self {
449            inner: self.inner.clone(),
450            module_decoders,
451        }
452    }
453
454    /// Is this `Database` a global, unpartitioned `Database`
455    pub fn is_global(&self) -> bool {
456        self.inner.is_global()
457    }
458
459    /// `Err` if [`Self::is_global`] is not true
460    pub fn ensure_global(&self) -> Result<()> {
461        if !self.is_global() {
462            bail!("Database instance not global");
463        }
464
465        Ok(())
466    }
467
468    /// `Err` if [`Self::is_global`] is true
469    pub fn ensure_isolated(&self) -> Result<()> {
470        if self.is_global() {
471            bail!("Database instance not isolated");
472        }
473
474        Ok(())
475    }
476
477    /// Begin a new committable database transaction
478    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
479    where
480        's: 'tx,
481    {
482        DatabaseTransaction::<Committable>::new(
483            self.inner.begin_transaction().await,
484            self.module_decoders.clone(),
485        )
486    }
487
488    /// Begin a new non-committable database transaction
489    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
490    where
491        's: 'tx,
492    {
493        self.begin_transaction().await.into_nc()
494    }
495
496    pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
497        self.inner.checkpoint(backup_path)
498    }
499
500    /// Runs a closure with a reference to a database transaction and tries to
501    /// commit the transaction if the closure returns `Ok` and rolls it back
502    /// otherwise. If committing fails the closure is run for up to
503    /// `max_attempts` times. If `max_attempts` is `None` it will run
504    /// `usize::MAX` times which is close enough to infinite times.
505    ///
506    /// The closure `tx_fn` provided should not have side effects outside of the
507    /// database transaction provided, or if it does these should be
508    /// idempotent, since the closure might be run multiple times.
509    ///
510    /// # Lifetime Parameters
511    ///
512    /// The higher rank trait bound (HRTB) `'a` that is applied to the the
513    /// mutable reference to the database transaction ensures that the
514    /// reference lives as least as long as the returned future of the
515    /// closure.
516    ///
517    /// Further, the reference to self (`'s`) must outlive the
518    /// `DatabaseTransaction<'dt>`. In other words, the
519    /// `DatabaseTransaction` must live as least as long as `self` and that is
520    /// true as the `DatabaseTransaction` is only dropped at the end of the
521    /// `loop{}`.
522    ///
523    /// # Panics
524    ///
525    /// This function panics when the given number of maximum attempts is zero.
526    /// `max_attempts` must be greater or equal to one.
527    pub async fn autocommit<'s, 'dbtx, F, T, E>(
528        &'s self,
529        tx_fn: F,
530        max_attempts: Option<usize>,
531    ) -> Result<T, AutocommitError<E>>
532    where
533        's: 'dbtx,
534        for<'r, 'o> F: Fn(
535            &'r mut DatabaseTransaction<'o>,
536            PhantomBound<'dbtx, 'o>,
537        ) -> BoxFuture<'r, Result<T, E>>,
538    {
539        assert_ne!(max_attempts, Some(0));
540        let mut curr_attempts: usize = 0;
541
542        loop {
543            // The `checked_add()` function is used to catch the `usize` overflow.
544            // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
545            // after ~50 days. But if that's the case, something else must be wrong.
546            // With `usize=64bit` it would take much longer, obviously.
547            curr_attempts = curr_attempts
548                .checked_add(1)
549                .expect("db autocommit attempt counter overflowed");
550
551            let mut dbtx = self.begin_transaction().await;
552
553            let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
554            let val = match tx_fn_res {
555                Ok(val) => val,
556                Err(err) => {
557                    dbtx.ignore_uncommitted();
558                    return Err(AutocommitError::ClosureError {
559                        attempts: curr_attempts,
560                        error: err,
561                    });
562                }
563            };
564
565            let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
566
567            match dbtx.commit_tx_result().await {
568                Ok(()) => {
569                    return Ok(val);
570                }
571                Err(err) => {
572                    if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
573                        warn!(
574                            target: LOG_DB,
575                            curr_attempts,
576                            ?err,
577                            "Database commit failed in an autocommit block - terminating"
578                        );
579                        return Err(AutocommitError::CommitFailed {
580                            attempts: curr_attempts,
581                            last_error: err,
582                        });
583                    }
584
585                    let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
586                    let delay = rand::thread_rng().gen_range(delay..(2 * delay));
587                    warn!(
588                        target: LOG_DB,
589                        curr_attempts,
590                        err = %err.fmt_compact_anyhow(),
591                        delay_ms = %delay,
592                        "Database commit failed in an autocommit block - retrying"
593                    );
594                    crate::runtime::sleep(Duration::from_millis(delay)).await;
595                }
596            }
597        }
598    }
599
600    /// Waits for key to be notified.
601    ///
602    /// Calls the `checker` when value of the key may have changed.
603    /// Returns the value when `checker` returns a `Some(T)`.
604    pub async fn wait_key_check<'a, K, T>(
605        &'a self,
606        key: &K,
607        checker: impl Fn(Option<K::Value>) -> Option<T>,
608    ) -> (T, DatabaseTransaction<'a, Committable>)
609    where
610        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
611    {
612        let key_bytes = key.to_bytes();
613        loop {
614            // register for notification
615            let notify = self.inner.register(&key_bytes);
616
617            // check for value in db
618            let mut tx = self.inner.begin_transaction().await;
619
620            let maybe_value_bytes = tx
621                .raw_get_bytes(&key_bytes)
622                .await
623                .expect("Unrecoverable error when reading from database")
624                .map(|value_bytes| {
625                    decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
626                });
627
628            if let Some(value) = checker(maybe_value_bytes) {
629                return (
630                    value,
631                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
632                );
633            }
634
635            // key not found, try again
636            notify.await;
637            // if miss a notification between await and next register, it is
638            // fine. because we are going check the database
639        }
640    }
641
642    /// Waits for key to be present in database.
643    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
644    where
645        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
646    {
647        self.wait_key_check(key, std::convert::identity).await.0
648    }
649}
650
651fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
652    let mut bytes = vec![MODULE_GLOBAL_PREFIX];
653    bytes.append(&mut module_instance_id.consensus_encode_to_vec());
654    bytes
655}
656
657/// A database that wraps an `inner` one and adds a prefix to all operations,
658/// effectively creating an isolated partition.
659#[derive(Clone, Debug)]
660struct PrefixDatabase<Inner>
661where
662    Inner: Debug,
663{
664    prefix: Vec<u8>,
665    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
666    inner: Inner,
667}
668
669impl<Inner> PrefixDatabase<Inner>
670where
671    Inner: Debug,
672{
673    // TODO: we should optimize these concatenations, maybe by having an internal
674    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
675    // something
676    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
677        let mut full_key = self.prefix.clone();
678        full_key.extend_from_slice(key);
679        full_key
680    }
681}
682
683#[apply(async_trait_maybe_send!)]
684impl<Inner> IDatabase for PrefixDatabase<Inner>
685where
686    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
687{
688    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
689        Box::new(PrefixDatabaseTransaction {
690            inner: self.inner.begin_transaction().await,
691            global_dbtx_access_token: self.global_dbtx_access_token,
692            prefix: self.prefix.clone(),
693        })
694    }
695    async fn register(&self, key: &[u8]) {
696        self.inner.register(&self.get_full_key(key)).await;
697    }
698
699    async fn notify(&self, key: &[u8]) {
700        self.inner.notify(&self.get_full_key(key)).await;
701    }
702
703    fn is_global(&self) -> bool {
704        if self.global_dbtx_access_token.is_some() {
705            false
706        } else {
707            self.inner.is_global()
708        }
709    }
710
711    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
712        self.inner.checkpoint(backup_path)
713    }
714}
715
716/// A database transactions that wraps an `inner` one and adds a prefix to all
717/// operations, effectively creating an isolated partition.
718///
719/// Produced by [`PrefixDatabase`].
720#[derive(Debug)]
721struct PrefixDatabaseTransaction<Inner> {
722    inner: Inner,
723    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
724    prefix: Vec<u8>,
725}
726
727impl<Inner> PrefixDatabaseTransaction<Inner> {
728    // TODO: we should optimize these concatenations, maybe by having an internal
729    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
730    // something
731    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
732        let mut full_key = self.prefix.clone();
733        full_key.extend_from_slice(key);
734        full_key
735    }
736
737    fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
738        Range {
739            start: self.get_full_key(range.start),
740            end: self.get_full_key(range.end),
741        }
742    }
743
744    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
745        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
746    }
747}
748
749#[apply(async_trait_maybe_send!)]
750impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
751where
752    Inner: IDatabaseTransaction,
753{
754    async fn commit_tx(&mut self) -> Result<()> {
755        self.inner.commit_tx().await
756    }
757
758    fn is_global(&self) -> bool {
759        if self.global_dbtx_access_token.is_some() {
760            false
761        } else {
762            self.inner.is_global()
763        }
764    }
765
766    fn global_dbtx(
767        &mut self,
768        access_token: GlobalDBTxAccessToken,
769    ) -> &mut dyn IDatabaseTransaction {
770        if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
771            assert_eq!(
772                access_token, self_global_dbtx_access_token,
773                "Invalid access key used to access global_dbtx"
774            );
775            &mut self.inner
776        } else {
777            self.inner.global_dbtx(access_token)
778        }
779    }
780}
781
782#[apply(async_trait_maybe_send!)]
783impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
784where
785    Inner: IDatabaseTransactionOpsCore,
786{
787    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
788        let key = self.get_full_key(key);
789        self.inner.raw_insert_bytes(&key, value).await
790    }
791
792    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
793        let key = self.get_full_key(key);
794        self.inner.raw_get_bytes(&key).await
795    }
796
797    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
798        let key = self.get_full_key(key);
799        self.inner.raw_remove_entry(&key).await
800    }
801
802    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
803        let key = self.get_full_key(key_prefix);
804        let stream = self.inner.raw_find_by_prefix(&key).await?;
805        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
806    }
807
808    async fn raw_find_by_prefix_sorted_descending(
809        &mut self,
810        key_prefix: &[u8],
811    ) -> Result<PrefixStream<'_>> {
812        let key = self.get_full_key(key_prefix);
813        let stream = self
814            .inner
815            .raw_find_by_prefix_sorted_descending(&key)
816            .await?;
817        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
818    }
819
820    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
821        let range = self.get_full_range(range);
822        let stream = self
823            .inner
824            .raw_find_by_range(Range {
825                start: &range.start,
826                end: &range.end,
827            })
828            .await?;
829        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
830    }
831
832    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
833        let key = self.get_full_key(key_prefix);
834        self.inner.raw_remove_by_prefix(&key).await
835    }
836}
837
838#[apply(async_trait_maybe_send!)]
839impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
840where
841    Inner: IDatabaseTransactionOps,
842{
843    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
844        self.inner.rollback_tx_to_savepoint().await
845    }
846
847    async fn set_tx_savepoint(&mut self) -> Result<()> {
848        self.set_tx_savepoint().await
849    }
850}
851
852/// Core raw a operations database transactions supports
853///
854/// Used to enforce the same signature on all types supporting it
855#[apply(async_trait_maybe_send!)]
856pub trait IDatabaseTransactionOpsCore: MaybeSend {
857    /// Insert entry
858    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
859
860    /// Get key value
861    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
862
863    /// Remove entry by `key`
864    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
865
866    /// Returns an stream of key-value pairs with keys that start with
867    /// `key_prefix`, sorted by key.
868    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
869
870    /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
871    async fn raw_find_by_prefix_sorted_descending(
872        &mut self,
873        key_prefix: &[u8],
874    ) -> Result<PrefixStream<'_>>;
875
876    /// Returns an stream of key-value pairs with keys within a `range`, sorted
877    /// by key. [`Range`] is an (half-open) range bounded inclusively below and
878    /// exclusively above.
879    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
880
881    /// Delete keys matching prefix
882    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
883}
884
885#[apply(async_trait_maybe_send!)]
886impl<T> IDatabaseTransactionOpsCore for Box<T>
887where
888    T: IDatabaseTransactionOpsCore + ?Sized,
889{
890    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
891        (**self).raw_insert_bytes(key, value).await
892    }
893
894    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
895        (**self).raw_get_bytes(key).await
896    }
897
898    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
899        (**self).raw_remove_entry(key).await
900    }
901
902    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
903        (**self).raw_find_by_prefix(key_prefix).await
904    }
905
906    async fn raw_find_by_prefix_sorted_descending(
907        &mut self,
908        key_prefix: &[u8],
909    ) -> Result<PrefixStream<'_>> {
910        (**self)
911            .raw_find_by_prefix_sorted_descending(key_prefix)
912            .await
913    }
914
915    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
916        (**self).raw_find_by_range(range).await
917    }
918
919    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
920        (**self).raw_remove_by_prefix(key_prefix).await
921    }
922}
923
924#[apply(async_trait_maybe_send!)]
925impl<T> IDatabaseTransactionOpsCore for &mut T
926where
927    T: IDatabaseTransactionOpsCore + ?Sized,
928{
929    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
930        (**self).raw_insert_bytes(key, value).await
931    }
932
933    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
934        (**self).raw_get_bytes(key).await
935    }
936
937    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
938        (**self).raw_remove_entry(key).await
939    }
940
941    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
942        (**self).raw_find_by_prefix(key_prefix).await
943    }
944
945    async fn raw_find_by_prefix_sorted_descending(
946        &mut self,
947        key_prefix: &[u8],
948    ) -> Result<PrefixStream<'_>> {
949        (**self)
950            .raw_find_by_prefix_sorted_descending(key_prefix)
951            .await
952    }
953
954    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
955        (**self).raw_find_by_range(range).await
956    }
957
958    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
959        (**self).raw_remove_by_prefix(key_prefix).await
960    }
961}
962
963/// Additional operations (only some) database transactions expose, on top of
964/// [`IDatabaseTransactionOpsCore`]
965///
966/// In certain contexts exposing these operations would be a problem, so they
967/// are moved to a separate trait.
968#[apply(async_trait_maybe_send!)]
969pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
970    /// Create a savepoint during the transaction that can be rolled back to
971    /// using rollback_tx_to_savepoint. Rolling back to the savepoint will
972    /// atomically remove the writes that were applied since the savepoint
973    /// was created.
974    ///
975    /// Warning: Avoid using this in fedimint client code as not all database
976    /// transaction implementations will support setting a savepoint during
977    /// a transaction.
978    async fn set_tx_savepoint(&mut self) -> Result<()>;
979
980    async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
981}
982
983#[apply(async_trait_maybe_send!)]
984impl<T> IDatabaseTransactionOps for Box<T>
985where
986    T: IDatabaseTransactionOps + ?Sized,
987{
988    async fn set_tx_savepoint(&mut self) -> Result<()> {
989        (**self).set_tx_savepoint().await
990    }
991
992    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
993        (**self).rollback_tx_to_savepoint().await
994    }
995}
996
997#[apply(async_trait_maybe_send!)]
998impl<T> IDatabaseTransactionOps for &mut T
999where
1000    T: IDatabaseTransactionOps + ?Sized,
1001{
1002    async fn set_tx_savepoint(&mut self) -> Result<()> {
1003        (**self).set_tx_savepoint().await
1004    }
1005
1006    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1007        (**self).rollback_tx_to_savepoint().await
1008    }
1009}
1010
1011/// Like [`IDatabaseTransactionOpsCore`], but typed
1012///
1013/// Implemented via blanket impl for everything that implements
1014/// [`IDatabaseTransactionOpsCore`] that has decoders (implements
1015/// [`WithDecoders`]).
1016#[apply(async_trait_maybe_send!)]
1017pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1018    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1019    where
1020        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1021
1022    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1023    where
1024        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1025        K::Value: MaybeSend + MaybeSync;
1026
1027    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1028    where
1029        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1030        K::Value: MaybeSend + MaybeSync;
1031
1032    async fn find_by_range<K>(
1033        &mut self,
1034        key_range: Range<K>,
1035    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1036    where
1037        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1038        K::Value: MaybeSend + MaybeSync;
1039
1040    async fn find_by_prefix<KP>(
1041        &mut self,
1042        key_prefix: &KP,
1043    ) -> Pin<
1044        Box<
1045            maybe_add_send!(
1046                dyn Stream<
1047                        Item = (
1048                            KP::Record,
1049                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1050                        ),
1051                    > + '_
1052            ),
1053        >,
1054    >
1055    where
1056        KP: DatabaseLookup + MaybeSend + MaybeSync,
1057        KP::Record: DatabaseKey;
1058
1059    async fn find_by_prefix_sorted_descending<KP>(
1060        &mut self,
1061        key_prefix: &KP,
1062    ) -> Pin<
1063        Box<
1064            maybe_add_send!(
1065                dyn Stream<
1066                        Item = (
1067                            KP::Record,
1068                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1069                        ),
1070                    > + '_
1071            ),
1072        >,
1073    >
1074    where
1075        KP: DatabaseLookup + MaybeSend + MaybeSync,
1076        KP::Record: DatabaseKey;
1077
1078    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1079    where
1080        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1081
1082    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1083    where
1084        KP: DatabaseLookup + MaybeSend + MaybeSync;
1085}
1086
1087// blanket implementation of typed ops for anything that implements raw ops and
1088// has decoders
1089#[apply(async_trait_maybe_send!)]
1090impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1091where
1092    T: IDatabaseTransactionOpsCore + WithDecoders,
1093{
1094    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1095    where
1096        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1097    {
1098        let key_bytes = key.to_bytes();
1099        let raw = self
1100            .raw_get_bytes(&key_bytes)
1101            .await
1102            .expect("Unrecoverable error occurred while reading and entry from the database");
1103        raw.map(|value_bytes| {
1104            decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1105        })
1106    }
1107
1108    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1109    where
1110        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1111        K::Value: MaybeSend + MaybeSync,
1112    {
1113        let key_bytes = key.to_bytes();
1114        self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1115            .await
1116            .expect("Unrecoverable error occurred while inserting entry into the database")
1117            .map(|value_bytes| {
1118                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1119            })
1120    }
1121
1122    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1123    where
1124        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1125        K::Value: MaybeSend + MaybeSync,
1126    {
1127        if let Some(prev) = self.insert_entry(key, value).await {
1128            panic!(
1129                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1130            );
1131        }
1132    }
1133
1134    async fn find_by_range<K>(
1135        &mut self,
1136        key_range: Range<K>,
1137    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1138    where
1139        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1140        K::Value: MaybeSend + MaybeSync,
1141    {
1142        let decoders = self.decoders().clone();
1143        Box::pin(
1144            self.raw_find_by_range(Range {
1145                start: &key_range.start.to_bytes(),
1146                end: &key_range.end.to_bytes(),
1147            })
1148            .await
1149            .expect("Unrecoverable error occurred while listing entries from the database")
1150            .map(move |(key_bytes, value_bytes)| {
1151                let key = decode_key_expect(&key_bytes, &decoders);
1152                let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1153                (key, value)
1154            }),
1155        )
1156    }
1157
1158    async fn find_by_prefix<KP>(
1159        &mut self,
1160        key_prefix: &KP,
1161    ) -> Pin<
1162        Box<
1163            maybe_add_send!(
1164                dyn Stream<
1165                        Item = (
1166                            KP::Record,
1167                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1168                        ),
1169                    > + '_
1170            ),
1171        >,
1172    >
1173    where
1174        KP: DatabaseLookup + MaybeSend + MaybeSync,
1175        KP::Record: DatabaseKey,
1176    {
1177        let decoders = self.decoders().clone();
1178        Box::pin(
1179            self.raw_find_by_prefix(&key_prefix.to_bytes())
1180                .await
1181                .expect("Unrecoverable error occurred while listing entries from the database")
1182                .map(move |(key_bytes, value_bytes)| {
1183                    let key = decode_key_expect(&key_bytes, &decoders);
1184                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1185                    (key, value)
1186                }),
1187        )
1188    }
1189
1190    async fn find_by_prefix_sorted_descending<KP>(
1191        &mut self,
1192        key_prefix: &KP,
1193    ) -> Pin<
1194        Box<
1195            maybe_add_send!(
1196                dyn Stream<
1197                        Item = (
1198                            KP::Record,
1199                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1200                        ),
1201                    > + '_
1202            ),
1203        >,
1204    >
1205    where
1206        KP: DatabaseLookup + MaybeSend + MaybeSync,
1207        KP::Record: DatabaseKey,
1208    {
1209        let decoders = self.decoders().clone();
1210        Box::pin(
1211            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1212                .await
1213                .expect("Unrecoverable error occurred while listing entries from the database")
1214                .map(move |(key_bytes, value_bytes)| {
1215                    let key = decode_key_expect(&key_bytes, &decoders);
1216                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1217                    (key, value)
1218                }),
1219        )
1220    }
1221    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1222    where
1223        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1224    {
1225        let key_bytes = key.to_bytes();
1226        self.raw_remove_entry(&key_bytes)
1227            .await
1228            .expect("Unrecoverable error occurred while inserting removing entry from the database")
1229            .map(|value_bytes| {
1230                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1231            })
1232    }
1233    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1234    where
1235        KP: DatabaseLookup + MaybeSend + MaybeSync,
1236    {
1237        self.raw_remove_by_prefix(&key_prefix.to_bytes())
1238            .await
1239            .expect("Unrecoverable error when removing entries from the database");
1240    }
1241}
1242
1243/// A database type that has decoders, which allows it to implement
1244/// [`IDatabaseTransactionOpsCoreTyped`]
1245pub trait WithDecoders {
1246    fn decoders(&self) -> &ModuleDecoderRegistry;
1247}
1248
1249/// Raw database transaction (e.g. rocksdb implementation)
1250#[apply(async_trait_maybe_send!)]
1251pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1252    async fn commit_tx(self) -> Result<()>;
1253}
1254
1255/// Fedimint database transaction
1256///
1257/// See [`IDatabase`] for more info.
1258#[apply(async_trait_maybe_send!)]
1259pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1260    /// Commit the transaction
1261    async fn commit_tx(&mut self) -> Result<()>;
1262
1263    /// Is global database
1264    fn is_global(&self) -> bool;
1265
1266    /// Get the global database tx from a module-prefixed database transaction
1267    ///
1268    /// Meant to be called only by core internals, and module developers should
1269    /// not call it directly.
1270    #[doc(hidden)]
1271    fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1272        -> &mut dyn IDatabaseTransaction;
1273}
1274
1275#[apply(async_trait_maybe_send!)]
1276impl<T> IDatabaseTransaction for Box<T>
1277where
1278    T: IDatabaseTransaction + ?Sized,
1279{
1280    async fn commit_tx(&mut self) -> Result<()> {
1281        (**self).commit_tx().await
1282    }
1283
1284    fn is_global(&self) -> bool {
1285        (**self).is_global()
1286    }
1287
1288    fn global_dbtx(
1289        &mut self,
1290        access_token: GlobalDBTxAccessToken,
1291    ) -> &mut dyn IDatabaseTransaction {
1292        (**self).global_dbtx(access_token)
1293    }
1294}
1295
1296#[apply(async_trait_maybe_send!)]
1297impl<'a, T> IDatabaseTransaction for &'a mut T
1298where
1299    T: IDatabaseTransaction + ?Sized,
1300{
1301    async fn commit_tx(&mut self) -> Result<()> {
1302        (**self).commit_tx().await
1303    }
1304
1305    fn is_global(&self) -> bool {
1306        (**self).is_global()
1307    }
1308
1309    fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1310        (**self).global_dbtx(access_key)
1311    }
1312}
1313
1314/// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1315/// easier in other structs since it does not consumed `self` by move.
1316struct BaseDatabaseTransaction<Tx> {
1317    // TODO: merge options
1318    raw: Option<Tx>,
1319    notify_queue: Option<NotifyQueue>,
1320    notifications: Arc<Notifications>,
1321}
1322
1323impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1324where
1325    Tx: fmt::Debug,
1326{
1327    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1328        f.write_fmt(format_args!(
1329            "BaseDatabaseTransaction{{ raw={:?} }}",
1330            self.raw
1331        ))
1332    }
1333}
1334impl<Tx> BaseDatabaseTransaction<Tx>
1335where
1336    Tx: IRawDatabaseTransaction,
1337{
1338    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1339        Self {
1340            raw: Some(dbtx),
1341            notifications,
1342            notify_queue: Some(NotifyQueue::new()),
1343        }
1344    }
1345
1346    fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1347        self.notify_queue
1348            .as_mut()
1349            .context("can not call add_notification_key after commit")?
1350            .add(&key);
1351        Ok(())
1352    }
1353}
1354
1355#[apply(async_trait_maybe_send!)]
1356impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1357    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1358        self.add_notification_key(key)?;
1359        self.raw
1360            .as_mut()
1361            .context("Cannot insert into already consumed transaction")?
1362            .raw_insert_bytes(key, value)
1363            .await
1364    }
1365
1366    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1367        self.raw
1368            .as_mut()
1369            .context("Cannot retrieve from already consumed transaction")?
1370            .raw_get_bytes(key)
1371            .await
1372    }
1373
1374    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1375        self.add_notification_key(key)?;
1376        self.raw
1377            .as_mut()
1378            .context("Cannot remove from already consumed transaction")?
1379            .raw_remove_entry(key)
1380            .await
1381    }
1382
1383    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1384        self.raw
1385            .as_mut()
1386            .context("Cannot retrieve from already consumed transaction")?
1387            .raw_find_by_range(key_range)
1388            .await
1389    }
1390
1391    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1392        self.raw
1393            .as_mut()
1394            .context("Cannot retrieve from already consumed transaction")?
1395            .raw_find_by_prefix(key_prefix)
1396            .await
1397    }
1398
1399    async fn raw_find_by_prefix_sorted_descending(
1400        &mut self,
1401        key_prefix: &[u8],
1402    ) -> Result<PrefixStream<'_>> {
1403        self.raw
1404            .as_mut()
1405            .context("Cannot retrieve from already consumed transaction")?
1406            .raw_find_by_prefix_sorted_descending(key_prefix)
1407            .await
1408    }
1409
1410    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1411        self.raw
1412            .as_mut()
1413            .context("Cannot remove from already consumed transaction")?
1414            .raw_remove_by_prefix(key_prefix)
1415            .await
1416    }
1417}
1418
1419#[apply(async_trait_maybe_send!)]
1420impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1421    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1422        self.raw
1423            .as_mut()
1424            .context("Cannot rollback to a savepoint on an already consumed transaction")?
1425            .rollback_tx_to_savepoint()
1426            .await?;
1427        Ok(())
1428    }
1429
1430    async fn set_tx_savepoint(&mut self) -> Result<()> {
1431        self.raw
1432            .as_mut()
1433            .context("Cannot set a tx savepoint on an already consumed transaction")?
1434            .set_tx_savepoint()
1435            .await?;
1436        Ok(())
1437    }
1438}
1439
1440#[apply(async_trait_maybe_send!)]
1441impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1442    for BaseDatabaseTransaction<Tx>
1443{
1444    async fn commit_tx(&mut self) -> Result<()> {
1445        self.raw
1446            .take()
1447            .context("Cannot commit an already committed transaction")?
1448            .commit_tx()
1449            .await?;
1450        self.notifications.submit_queue(
1451            &self
1452                .notify_queue
1453                .take()
1454                .expect("commit must be called only once"),
1455        );
1456        Ok(())
1457    }
1458
1459    fn is_global(&self) -> bool {
1460        true
1461    }
1462
1463    fn global_dbtx(
1464        &mut self,
1465        _access_token: GlobalDBTxAccessToken,
1466    ) -> &mut dyn IDatabaseTransaction {
1467        panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1468    }
1469}
1470
1471/// A helper for tracking and logging on `Drop` any instances of uncommitted
1472/// writes
1473#[derive(Clone)]
1474struct CommitTracker {
1475    /// Is the dbtx committed
1476    is_committed: bool,
1477    /// Does the dbtx have any writes
1478    has_writes: bool,
1479    /// Don't warn-log uncommitted writes
1480    ignore_uncommitted: bool,
1481}
1482
1483impl Drop for CommitTracker {
1484    fn drop(&mut self) {
1485        if self.has_writes && !self.is_committed {
1486            if self.ignore_uncommitted {
1487                trace!(
1488                    target: LOG_DB,
1489                    "DatabaseTransaction has writes and has not called commit, but that's expected."
1490                );
1491            } else {
1492                warn!(
1493                    target: LOG_DB,
1494                    location = ?backtrace::Backtrace::new(),
1495                    "DatabaseTransaction has writes and has not called commit."
1496                );
1497            }
1498        }
1499    }
1500}
1501
1502enum MaybeRef<'a, T> {
1503    Owned(T),
1504    Borrowed(&'a mut T),
1505}
1506
1507impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1508    type Target = T;
1509
1510    fn deref(&self) -> &Self::Target {
1511        match self {
1512            MaybeRef::Owned(o) => o,
1513            MaybeRef::Borrowed(r) => r,
1514        }
1515    }
1516}
1517
1518impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1519    fn deref_mut(&mut self) -> &mut Self::Target {
1520        match self {
1521            MaybeRef::Owned(o) => o,
1522            MaybeRef::Borrowed(r) => r,
1523        }
1524    }
1525}
1526
1527/// Session type for [`DatabaseTransaction`] that is allowed to commit
1528///
1529/// Opposite of [`NonCommittable`].
1530pub struct Committable;
1531
1532/// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1533///
1534/// Opposite of [`Committable`].
1535pub struct NonCommittable;
1536
1537/// A high level database transaction handle
1538///
1539/// `Cap` is a session type
1540pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1541    tx: Box<dyn IDatabaseTransaction + 'tx>,
1542    decoders: ModuleDecoderRegistry,
1543    commit_tracker: MaybeRef<'tx, CommitTracker>,
1544    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1545    capability: marker::PhantomData<Cap>,
1546}
1547
1548impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1550        f.write_fmt(format_args!(
1551            "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1552            self.tx, self.decoders
1553        ))
1554    }
1555}
1556
1557impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1558    fn decoders(&self) -> &ModuleDecoderRegistry {
1559        &self.decoders
1560    }
1561}
1562
1563#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1564fn decode_value<V: DatabaseValue>(
1565    value_bytes: &[u8],
1566    decoders: &ModuleDecoderRegistry,
1567) -> Result<V, DecodingError> {
1568    trace!(
1569        bytes = %AbbreviateHexBytes(value_bytes),
1570        "decoding value",
1571    );
1572    V::from_bytes(value_bytes, decoders)
1573}
1574
1575fn decode_value_expect<V: DatabaseValue>(
1576    value_bytes: &[u8],
1577    decoders: &ModuleDecoderRegistry,
1578    key_bytes: &[u8],
1579) -> V {
1580    decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1581        panic!(
1582            "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1583            any::type_name::<V>(),
1584            err,
1585            AbbreviateHexBytes(key_bytes),
1586            AbbreviateHexBytes(value_bytes),
1587        )
1588    })
1589}
1590
1591fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1592    trace!(
1593        bytes = %AbbreviateHexBytes(key_bytes),
1594        "decoding key",
1595    );
1596    K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1597        panic!(
1598            "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1599            any::type_name::<K>(),
1600            err,
1601            AbbreviateHexBytes(key_bytes)
1602        )
1603    })
1604}
1605
1606impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1607    /// Convert into a non-committable version
1608    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1609        DatabaseTransaction {
1610            tx: self.tx,
1611            decoders: self.decoders,
1612            commit_tracker: self.commit_tracker,
1613            on_commit_hooks: self.on_commit_hooks,
1614            capability: PhantomData::<NonCommittable>,
1615        }
1616    }
1617
1618    /// Get a reference to a non-committeable version
1619    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1620    where
1621        's: 'a,
1622    {
1623        self.to_ref().into_nc()
1624    }
1625
1626    /// Get [`DatabaseTransaction`] isolated to a `prefix`
1627    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1628    where
1629        'tx: 'a,
1630    {
1631        DatabaseTransaction {
1632            tx: Box::new(PrefixDatabaseTransaction {
1633                inner: self.tx,
1634                global_dbtx_access_token: None,
1635                prefix,
1636            }),
1637            decoders: self.decoders,
1638            commit_tracker: self.commit_tracker,
1639            on_commit_hooks: self.on_commit_hooks,
1640            capability: self.capability,
1641        }
1642    }
1643
1644    /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1645    /// `module_instance_id`, allowing the module to access global_dbtx
1646    /// with the right access token.
1647    pub fn with_prefix_module_id<'a: 'tx>(
1648        self,
1649        module_instance_id: ModuleInstanceId,
1650    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1651    where
1652        'tx: 'a,
1653    {
1654        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1655        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1656        (
1657            DatabaseTransaction {
1658                tx: Box::new(PrefixDatabaseTransaction {
1659                    inner: self.tx,
1660                    global_dbtx_access_token: Some(global_dbtx_access_token),
1661                    prefix,
1662                }),
1663                decoders: self.decoders,
1664                commit_tracker: self.commit_tracker,
1665                on_commit_hooks: self.on_commit_hooks,
1666                capability: self.capability,
1667            },
1668            global_dbtx_access_token,
1669        )
1670    }
1671
1672    /// Get [`DatabaseTransaction`] to `self`
1673    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1674    where
1675        's: 'a,
1676    {
1677        let decoders = self.decoders.clone();
1678
1679        DatabaseTransaction {
1680            tx: Box::new(&mut self.tx),
1681            decoders,
1682            commit_tracker: match self.commit_tracker {
1683                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1684                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1685            },
1686            on_commit_hooks: match self.on_commit_hooks {
1687                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1688                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1689            },
1690            capability: self.capability,
1691        }
1692    }
1693
1694    /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1695    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1696    where
1697        'tx: 'a,
1698    {
1699        DatabaseTransaction {
1700            tx: Box::new(PrefixDatabaseTransaction {
1701                inner: &mut self.tx,
1702                global_dbtx_access_token: None,
1703                prefix,
1704            }),
1705            decoders: self.decoders.clone(),
1706            commit_tracker: match self.commit_tracker {
1707                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1708                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1709            },
1710            on_commit_hooks: match self.on_commit_hooks {
1711                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1712                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1713            },
1714            capability: self.capability,
1715        }
1716    }
1717
1718    pub fn to_ref_with_prefix_module_id<'a>(
1719        &'a mut self,
1720        module_instance_id: ModuleInstanceId,
1721    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1722    where
1723        'tx: 'a,
1724    {
1725        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1726        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1727        (
1728            DatabaseTransaction {
1729                tx: Box::new(PrefixDatabaseTransaction {
1730                    inner: &mut self.tx,
1731                    global_dbtx_access_token: Some(global_dbtx_access_token),
1732                    prefix,
1733                }),
1734                decoders: self.decoders.clone(),
1735                commit_tracker: match self.commit_tracker {
1736                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1737                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1738                },
1739                on_commit_hooks: match self.on_commit_hooks {
1740                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1741                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1742                },
1743                capability: self.capability,
1744            },
1745            global_dbtx_access_token,
1746        )
1747    }
1748
1749    /// Is this `Database` a global, unpartitioned `Database`
1750    pub fn is_global(&self) -> bool {
1751        self.tx.is_global()
1752    }
1753
1754    /// `Err` if [`Self::is_global`] is not true
1755    pub fn ensure_global(&self) -> Result<()> {
1756        if !self.is_global() {
1757            bail!("Database instance not global");
1758        }
1759
1760        Ok(())
1761    }
1762
1763    /// `Err` if [`Self::is_global`] is true
1764    pub fn ensure_isolated(&self) -> Result<()> {
1765        if self.is_global() {
1766            bail!("Database instance not isolated");
1767        }
1768
1769        Ok(())
1770    }
1771
1772    /// Cancel the tx to avoid debugging warnings about uncommitted writes
1773    pub fn ignore_uncommitted(&mut self) -> &mut Self {
1774        self.commit_tracker.ignore_uncommitted = true;
1775        self
1776    }
1777
1778    /// Create warnings about uncommitted writes
1779    pub fn warn_uncommitted(&mut self) -> &mut Self {
1780        self.commit_tracker.ignore_uncommitted = false;
1781        self
1782    }
1783
1784    /// Register a hook that will be run after commit succeeds.
1785    #[instrument(target = LOG_DB, level = "trace", skip_all)]
1786    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1787        self.on_commit_hooks.push(Box::new(f));
1788    }
1789
1790    pub fn global_dbtx<'a>(
1791        &'a mut self,
1792        access_token: GlobalDBTxAccessToken,
1793    ) -> DatabaseTransaction<'a, Cap>
1794    where
1795        'tx: 'a,
1796    {
1797        let decoders = self.decoders.clone();
1798
1799        DatabaseTransaction {
1800            tx: Box::new(self.tx.global_dbtx(access_token)),
1801            decoders,
1802            commit_tracker: match self.commit_tracker {
1803                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1804                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1805            },
1806            on_commit_hooks: match self.on_commit_hooks {
1807                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1808                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1809            },
1810            capability: self.capability,
1811        }
1812    }
1813}
1814
1815/// Code used to access `global_dbtx`
1816#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1817pub struct GlobalDBTxAccessToken(u32);
1818
1819impl GlobalDBTxAccessToken {
1820    /// Calculate an access code for accessing global_dbtx from a prefixed
1821    /// database tx
1822    ///
1823    /// Since we need to do it at runtime, we want the user modules not to be
1824    /// able to call `global_dbtx` too easily. But at the same time we don't
1825    /// need to be paranoid.
1826    ///
1827    /// This must be deterministic during whole instance of the software running
1828    /// (because it's being rederived independently in multiple codepahs) , but
1829    /// it could be somewhat randomized between different runs and releases.
1830    fn from_prefix(prefix: &[u8]) -> Self {
1831        Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1832    }
1833}
1834
1835impl<'tx> DatabaseTransaction<'tx, Committable> {
1836    pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1837        Self {
1838            tx: dbtx,
1839            decoders,
1840            commit_tracker: MaybeRef::Owned(CommitTracker {
1841                is_committed: false,
1842                has_writes: false,
1843                ignore_uncommitted: false,
1844            }),
1845            on_commit_hooks: MaybeRef::Owned(vec![]),
1846            capability: PhantomData,
1847        }
1848    }
1849
1850    pub async fn commit_tx_result(mut self) -> Result<()> {
1851        self.commit_tracker.is_committed = true;
1852        let commit_result = self.tx.commit_tx().await;
1853
1854        // Run commit hooks in case commit was successful
1855        if commit_result.is_ok() {
1856            for hook in self.on_commit_hooks.deref_mut().drain(..) {
1857                hook();
1858            }
1859        }
1860
1861        commit_result
1862    }
1863
1864    pub async fn commit_tx(mut self) {
1865        self.commit_tracker.is_committed = true;
1866        self.commit_tx_result()
1867            .await
1868            .expect("Unrecoverable error occurred while committing to the database.");
1869    }
1870}
1871
1872#[apply(async_trait_maybe_send!)]
1873impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1874where
1875    Cap: Send,
1876{
1877    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1878        self.commit_tracker.has_writes = true;
1879        self.tx.raw_insert_bytes(key, value).await
1880    }
1881
1882    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1883        self.tx.raw_get_bytes(key).await
1884    }
1885
1886    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1887        self.tx.raw_remove_entry(key).await
1888    }
1889
1890    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1891        self.tx.raw_find_by_range(key_range).await
1892    }
1893
1894    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1895        self.tx.raw_find_by_prefix(key_prefix).await
1896    }
1897
1898    async fn raw_find_by_prefix_sorted_descending(
1899        &mut self,
1900        key_prefix: &[u8],
1901    ) -> Result<PrefixStream<'_>> {
1902        self.tx
1903            .raw_find_by_prefix_sorted_descending(key_prefix)
1904            .await
1905    }
1906
1907    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1908        self.commit_tracker.has_writes = true;
1909        self.tx.raw_remove_by_prefix(key_prefix).await
1910    }
1911}
1912#[apply(async_trait_maybe_send!)]
1913impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1914    async fn set_tx_savepoint(&mut self) -> Result<()> {
1915        self.tx.set_tx_savepoint().await
1916    }
1917
1918    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1919        self.tx.rollback_tx_to_savepoint().await
1920    }
1921}
1922
1923impl<T> DatabaseKeyPrefix for T
1924where
1925    T: DatabaseLookup + crate::encoding::Encodable + Debug,
1926{
1927    fn to_bytes(&self) -> Vec<u8> {
1928        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1929        data.append(&mut self.consensus_encode_to_vec());
1930        data
1931    }
1932}
1933
1934impl<T> DatabaseKey for T
1935where
1936    // Note: key can only be `T` that can be decoded without modules (even if
1937    // module type is `()`)
1938    T: DatabaseRecord + crate::encoding::Decodable + Sized,
1939{
1940    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1941    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1942        if data.is_empty() {
1943            // TODO: build better coding errors, pretty useless right now
1944            return Err(DecodingError::wrong_length(1, 0));
1945        }
1946
1947        if data[0] != Self::DB_PREFIX {
1948            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1949        }
1950
1951        <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1952            .map_err(|decode_error| DecodingError::Other(decode_error.0))
1953    }
1954}
1955
1956impl<T> DatabaseValue for T
1957where
1958    T: Debug + Encodable + Decodable,
1959{
1960    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1961        T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1962    }
1963
1964    fn to_bytes(&self) -> Vec<u8> {
1965        self.consensus_encode_to_vec()
1966    }
1967}
1968
1969/// This is a helper macro that generates the implementations of
1970/// `DatabaseRecord` necessary for reading/writing to the
1971/// database and fetching by prefix.
1972///
1973/// - `key`: This is the type of struct that will be used as the key into the
1974///   database
1975/// - `value`: This is the type of struct that will be used as the value into
1976///   the database
1977/// - `db_prefix`: Required enum expression that is represented as a `u8` and is
1978///   prepended to this key
1979/// - `query_prefix`: Optional type of struct that can be passed zero or more
1980///   times. Every query prefix can be used to query the database via
1981///   `find_by_prefix`
1982///
1983/// # Examples
1984///
1985/// ```
1986/// use fedimint_core::encoding::{Decodable, Encodable};
1987/// use fedimint_core::impl_db_record;
1988///
1989/// #[derive(Debug, Encodable, Decodable)]
1990/// struct MyKey;
1991///
1992/// #[derive(Debug, Encodable, Decodable)]
1993/// struct MyValue;
1994///
1995/// #[repr(u8)]
1996/// #[derive(Clone, Debug)]
1997/// pub enum DbKeyPrefix {
1998///     MyKey = 0x50,
1999/// }
2000///
2001/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
2002/// ```
2003///
2004/// Use the required parameters and specify one `query_prefix`
2005///
2006/// ```
2007/// use fedimint_core::encoding::{Decodable, Encodable};
2008/// use fedimint_core::{impl_db_lookup, impl_db_record};
2009///
2010/// #[derive(Debug, Encodable, Decodable)]
2011/// struct MyKey;
2012///
2013/// #[derive(Debug, Encodable, Decodable)]
2014/// struct MyValue;
2015///
2016/// #[repr(u8)]
2017/// #[derive(Clone, Debug)]
2018/// pub enum DbKeyPrefix {
2019///     MyKey = 0x50,
2020/// }
2021///
2022/// #[derive(Debug, Encodable, Decodable)]
2023/// struct MyKeyPrefix;
2024///
2025/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
2026///
2027/// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
2028/// ```
2029#[macro_export]
2030macro_rules! impl_db_record {
2031    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => {
2032        impl $crate::db::DatabaseRecord for $key {
2033            const DB_PREFIX: u8 = $db_prefix as u8;
2034            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2035            type Key = Self;
2036            type Value = $val;
2037        }
2038        $(
2039            impl_db_record! {
2040                @impl_notify_marker key = $key, notify_on_modify = $notify
2041            }
2042        )?
2043    };
2044    // if notify is set to true
2045    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2046        impl $crate::db::DatabaseKeyWithNotify for $key {}
2047    };
2048    // if notify is set to false
2049    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2050}
2051
2052#[macro_export]
2053macro_rules! impl_db_lookup{
2054    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2055        $(
2056            impl $crate::db::DatabaseLookup for $query_prefix {
2057                type Record = $key;
2058            }
2059        )*
2060    };
2061}
2062
2063/// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
2064#[derive(Debug, Encodable, Decodable, Serialize)]
2065pub struct DatabaseVersionKeyV0;
2066
2067#[derive(Debug, Encodable, Decodable, Serialize)]
2068pub struct DatabaseVersionKey(pub ModuleInstanceId);
2069
2070#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2071pub struct DatabaseVersion(pub u64);
2072
2073impl_db_record!(
2074    key = DatabaseVersionKeyV0,
2075    value = DatabaseVersion,
2076    db_prefix = DbKeyPrefix::DatabaseVersion
2077);
2078
2079impl_db_record!(
2080    key = DatabaseVersionKey,
2081    value = DatabaseVersion,
2082    db_prefix = DbKeyPrefix::DatabaseVersion
2083);
2084
2085impl std::fmt::Display for DatabaseVersion {
2086    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2087        write!(f, "{}", self.0)
2088    }
2089}
2090
2091impl DatabaseVersion {
2092    pub fn increment(&self) -> Self {
2093        Self(self.0 + 1)
2094    }
2095}
2096
2097impl std::fmt::Display for DbKeyPrefix {
2098    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2099        write!(f, "{self:?}")
2100    }
2101}
2102
2103#[repr(u8)]
2104#[derive(Clone, EnumIter, Debug)]
2105pub enum DbKeyPrefix {
2106    DatabaseVersion = 0x50,
2107    ClientBackup = 0x51,
2108}
2109
2110#[derive(Debug, Error)]
2111pub enum DecodingError {
2112    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2113    WrongPrefix { expected: u8, found: u8 },
2114    #[error("Key had a wrong length, expected {expected} but got {found}")]
2115    WrongLength { expected: usize, found: usize },
2116    #[error("Other decoding error: {0:#}")]
2117    Other(anyhow::Error),
2118}
2119
2120impl DecodingError {
2121    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2122        Self::Other(anyhow::Error::from(error))
2123    }
2124
2125    pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2126        Self::WrongPrefix { expected, found }
2127    }
2128
2129    pub fn wrong_length(expected: usize, found: usize) -> Self {
2130        Self::WrongLength { expected, found }
2131    }
2132}
2133
2134#[macro_export]
2135macro_rules! push_db_pair_items {
2136    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2137        let db_items =
2138            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2139                .await
2140                .map(|(key, val)| {
2141                    (
2142                        $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2143                        val,
2144                    )
2145                })
2146                .collect::<BTreeMap<String, $value_type>>()
2147                .await;
2148
2149        $map.insert($key_literal.to_string(), Box::new(db_items));
2150    };
2151}
2152
2153#[macro_export]
2154macro_rules! push_db_key_items {
2155    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => {
2156        let db_items =
2157            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2158                .await
2159                .map(|(key, _)| key)
2160                .collect::<Vec<$key_type>>()
2161                .await;
2162
2163        $map.insert($key_literal.to_string(), Box::new(db_items));
2164    };
2165}
2166
2167/// `CoreMigrationFn` that modules can implement to "migrate" the database
2168/// to the next database version.
2169pub type CoreMigrationFn = for<'tx> fn(
2170    MigrationContext<'tx>,
2171) -> Pin<
2172    Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2173>;
2174
2175/// Verifies that all database migrations are defined contiguously and returns
2176/// the "current" database version, which is one greater than the last key in
2177/// the map.
2178pub fn get_current_database_version<F>(
2179    migrations: &BTreeMap<DatabaseVersion, F>,
2180) -> DatabaseVersion {
2181    let versions = migrations.keys().copied().collect::<Vec<_>>();
2182
2183    // Verify that all database migrations are defined contiguously. If there is a
2184    // gap, this indicates a programming error and we should panic.
2185    if !versions
2186        .windows(2)
2187        .all(|window| window[0].increment() == window[1])
2188    {
2189        panic!("Database Migrations are not defined contiguously");
2190    }
2191
2192    versions
2193        .last()
2194        .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2195}
2196
2197/// See [`apply_migrations_server_dbtx`]
2198pub async fn apply_migrations_server(
2199    db: &Database,
2200    kind: String,
2201    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2202) -> Result<(), anyhow::Error> {
2203    let mut global_dbtx = db.begin_transaction().await;
2204    global_dbtx.ensure_global()?;
2205    apply_migrations_server_dbtx(&mut global_dbtx.to_ref_nc(), kind, migrations).await?;
2206    global_dbtx.commit_tx_result().await
2207}
2208
2209/// Applies the database migrations to a non-isolated database.
2210pub async fn apply_migrations_server_dbtx(
2211    global_dbtx: &mut DatabaseTransaction<'_>,
2212    kind: String,
2213    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2214) -> Result<(), anyhow::Error> {
2215    global_dbtx.ensure_global()?;
2216    apply_migrations_dbtx(global_dbtx, kind, migrations, None, None).await
2217}
2218
2219pub async fn apply_migrations(
2220    db: &Database,
2221    kind: String,
2222    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2223    module_instance_id: Option<ModuleInstanceId>,
2224    // When used in client side context, we can/should ignore keys that external app
2225    // is allowed to use, and but since this function is shared, we make it optional argument
2226    external_prefixes_above: Option<u8>,
2227) -> Result<(), anyhow::Error> {
2228    let mut dbtx = db.begin_transaction().await;
2229    apply_migrations_dbtx(
2230        &mut dbtx.to_ref_nc(),
2231        kind,
2232        migrations,
2233        module_instance_id,
2234        external_prefixes_above,
2235    )
2236    .await?;
2237
2238    dbtx.commit_tx_result().await
2239}
2240/// `apply_migrations` iterates from the on disk database version for the
2241/// module.
2242///
2243/// `apply_migrations` iterates from the on disk database version for the module
2244/// up to `target_db_version` and executes all of the migrations that exist in
2245/// the migrations map. Each migration in migrations map updates the
2246/// database to have the correct on-disk structures that the code is expecting.
2247/// The entire migration process is atomic (i.e migration from 0->1 and 1->2
2248/// happen atomically). This function is called before the module is initialized
2249/// and as long as the correct migrations are supplied in the migrations map,
2250/// the module will be able to read and write from the database successfully.
2251pub async fn apply_migrations_dbtx(
2252    global_dbtx: &mut DatabaseTransaction<'_>,
2253    kind: String,
2254    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2255    module_instance_id: Option<ModuleInstanceId>,
2256    // When used in client side context, we can/should ignore keys that external app
2257    // is allowed to use, and but since this function is shared, we make it optional argument
2258    external_prefixes_above: Option<u8>,
2259) -> Result<(), anyhow::Error> {
2260    // Newly created databases will not have any data since they have just been
2261    // instantiated.
2262    let is_new_db = global_dbtx
2263        .raw_find_by_prefix(&[])
2264        .await?
2265        .filter(|(key, _v)| {
2266            std::future::ready(
2267                external_prefixes_above.map_or(true, |external_prefixes_above| {
2268                    !key.is_empty() && key[0] < external_prefixes_above
2269                }),
2270            )
2271        })
2272        .next()
2273        .await
2274        .is_none();
2275
2276    let target_db_version = get_current_database_version(&migrations);
2277
2278    // First write the database version to disk if it does not exist.
2279    create_database_version_dbtx(
2280        global_dbtx,
2281        target_db_version,
2282        module_instance_id,
2283        kind.clone(),
2284        is_new_db,
2285    )
2286    .await?;
2287
2288    let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2289
2290    let disk_version = global_dbtx
2291        .get_value(&DatabaseVersionKey(module_instance_id_key))
2292        .await;
2293
2294    let db_version = if let Some(disk_version) = disk_version {
2295        let mut current_db_version = disk_version;
2296
2297        if current_db_version > target_db_version {
2298            return Err(anyhow::anyhow!(format!(
2299                "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2300            )));
2301        }
2302
2303        while current_db_version < target_db_version {
2304            if let Some(migration) = migrations.get(&current_db_version) {
2305                info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2306                migration(MigrationContext {
2307                    dbtx: global_dbtx.to_ref_nc(),
2308                    module_instance_id,
2309                })
2310                .await?;
2311            } else {
2312                warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2313            }
2314
2315            current_db_version = current_db_version.increment();
2316            global_dbtx
2317                .insert_entry(
2318                    &DatabaseVersionKey(module_instance_id_key),
2319                    &current_db_version,
2320                )
2321                .await;
2322        }
2323
2324        current_db_version
2325    } else {
2326        target_db_version
2327    };
2328
2329    debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2330    Ok(())
2331}
2332
2333pub async fn create_database_version(
2334    db: &Database,
2335    target_db_version: DatabaseVersion,
2336    module_instance_id: Option<ModuleInstanceId>,
2337    kind: String,
2338    is_new_db: bool,
2339) -> Result<(), anyhow::Error> {
2340    let mut dbtx = db.begin_transaction().await;
2341
2342    create_database_version_dbtx(
2343        &mut dbtx.to_ref_nc(),
2344        target_db_version,
2345        module_instance_id,
2346        kind,
2347        is_new_db,
2348    )
2349    .await?;
2350
2351    dbtx.commit_tx_result().await?;
2352    Ok(())
2353}
2354
2355/// Creates the `DatabaseVersion` inside the database if it does not exist. If
2356/// necessary, this function will migrate the legacy database version to the
2357/// expected `DatabaseVersionKey`.
2358pub async fn create_database_version_dbtx(
2359    global_dbtx: &mut DatabaseTransaction<'_>,
2360    target_db_version: DatabaseVersion,
2361    module_instance_id: Option<ModuleInstanceId>,
2362    kind: String,
2363    is_new_db: bool,
2364) -> Result<(), anyhow::Error> {
2365    let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2366
2367    // First check if the module has a `DatabaseVersion` written to
2368    // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
2369    // nothing to do.
2370    if global_dbtx
2371        .get_value(&DatabaseVersionKey(key_module_instance_id))
2372        .await
2373        .is_none()
2374    {
2375        // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
2376        // in the module's isolated namespace (but not for fedimint-server or
2377        // fedimint-client).
2378        //
2379        // Otherwise, if the previous database contains data and no legacy database
2380        // version, use `DatabaseVersion(0)` so that all database migrations are
2381        // run. Otherwise, this database can assumed to be new and can use
2382        // `target_db_version` to skip the database migrations.
2383        let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2384            remove_current_db_version_if_exists(
2385                &mut global_dbtx
2386                    .to_ref_with_prefix_module_id(module_instance_id)
2387                    .0
2388                    .into_nc(),
2389                is_new_db,
2390                target_db_version,
2391            )
2392            .await
2393        } else {
2394            remove_current_db_version_if_exists(
2395                &mut global_dbtx.to_ref().into_nc(),
2396                is_new_db,
2397                target_db_version,
2398            )
2399            .await
2400        };
2401
2402        // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2403        debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2404        global_dbtx
2405            .insert_new_entry(
2406                &DatabaseVersionKey(key_module_instance_id),
2407                &current_version_in_module,
2408            )
2409            .await;
2410    }
2411
2412    Ok(())
2413}
2414
2415/// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2416/// returns the current database version. If the current version does not
2417/// exist, use `target_db_version` if the database is new. Otherwise, return
2418/// `DatabaseVersion(0)` to ensure all migrations are run.
2419async fn remove_current_db_version_if_exists(
2420    version_dbtx: &mut DatabaseTransaction<'_>,
2421    is_new_db: bool,
2422    target_db_version: DatabaseVersion,
2423) -> DatabaseVersion {
2424    // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2425    // exist, just use the 0 for the version so that all of the migrations are
2426    // executed.
2427    let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2428    match current_version_in_module {
2429        Some(database_version) => database_version,
2430        None if is_new_db => target_db_version,
2431        None => DatabaseVersion(0),
2432    }
2433}
2434
2435/// Helper function to retrieve the `module_instance_id` for modules, otherwise
2436/// return 0xff for the global namespace.
2437fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2438    // Use 0xff for fedimint-server and the `module_instance_id` for each module
2439    module_instance_id.map_or_else(
2440        || MODULE_GLOBAL_PREFIX.into(),
2441        |module_instance_id| module_instance_id,
2442    )
2443}
2444
2445pub struct MigrationContext<'tx> {
2446    dbtx: DatabaseTransaction<'tx>,
2447    module_instance_id: Option<ModuleInstanceId>,
2448}
2449
2450impl<'tx> MigrationContext<'tx> {
2451    pub fn dbtx(&mut self) -> DatabaseTransaction {
2452        if let Some(module_instance_id) = self.module_instance_id {
2453            self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2454        } else {
2455            self.dbtx.to_ref_nc()
2456        }
2457    }
2458
2459    pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2460        self.module_instance_id
2461    }
2462
2463    #[doc(hidden)]
2464    pub fn __global_dbtx(&mut self) -> &mut DatabaseTransaction<'tx> {
2465        &mut self.dbtx
2466    }
2467}
2468
2469#[allow(unused_imports)]
2470mod test_utils {
2471    use std::collections::BTreeMap;
2472    use std::time::Duration;
2473
2474    use fedimint_core::db::MigrationContext;
2475    use futures::future::ready;
2476    use futures::{Future, FutureExt, StreamExt};
2477    use rand::Rng;
2478    use tokio::join;
2479
2480    use super::{
2481        apply_migrations, CoreMigrationFn, Database, DatabaseTransaction, DatabaseVersion,
2482        DatabaseVersionKey, DatabaseVersionKeyV0,
2483    };
2484    use crate::core::ModuleKind;
2485    use crate::db::mem_impl::MemDatabase;
2486    use crate::db::{
2487        IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2488    };
2489    use crate::encoding::{Decodable, Encodable};
2490    use crate::module::registry::ModuleDecoderRegistry;
2491
2492    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2493        crate::runtime::timeout(Duration::from_millis(10), fut)
2494            .await
2495            .ok()
2496    }
2497
2498    #[repr(u8)]
2499    #[derive(Clone)]
2500    pub enum TestDbKeyPrefix {
2501        Test = 0x42,
2502        AltTest = 0x43,
2503        PercentTestKey = 0x25,
2504    }
2505
2506    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2507    pub(super) struct TestKey(pub u64);
2508
2509    #[derive(Debug, Encodable, Decodable)]
2510    struct DbPrefixTestPrefix;
2511
2512    impl_db_record!(
2513        key = TestKey,
2514        value = TestVal,
2515        db_prefix = TestDbKeyPrefix::Test,
2516        notify_on_modify = true,
2517    );
2518    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2519
2520    #[derive(Debug, Encodable, Decodable)]
2521    struct TestKeyV0(u64, u64);
2522
2523    #[derive(Debug, Encodable, Decodable)]
2524    struct DbPrefixTestPrefixV0;
2525
2526    impl_db_record!(
2527        key = TestKeyV0,
2528        value = TestVal,
2529        db_prefix = TestDbKeyPrefix::Test,
2530    );
2531    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2532
2533    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2534    struct AltTestKey(u64);
2535
2536    #[derive(Debug, Encodable, Decodable)]
2537    struct AltDbPrefixTestPrefix;
2538
2539    impl_db_record!(
2540        key = AltTestKey,
2541        value = TestVal,
2542        db_prefix = TestDbKeyPrefix::AltTest,
2543    );
2544    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2545
2546    #[derive(Debug, Encodable, Decodable)]
2547    struct PercentTestKey(u64);
2548
2549    #[derive(Debug, Encodable, Decodable)]
2550    struct PercentPrefixTestPrefix;
2551
2552    impl_db_record!(
2553        key = PercentTestKey,
2554        value = TestVal,
2555        db_prefix = TestDbKeyPrefix::PercentTestKey,
2556    );
2557
2558    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2559    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2560    pub(super) struct TestVal(pub u64);
2561
2562    const TEST_MODULE_PREFIX: u16 = 1;
2563    const ALT_MODULE_PREFIX: u16 = 2;
2564
2565    pub async fn verify_insert_elements(db: Database) {
2566        let mut dbtx = db.begin_transaction().await;
2567        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2568        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2569        dbtx.commit_tx().await;
2570
2571        // Test values were persisted
2572        let mut dbtx = db.begin_transaction().await;
2573        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2574        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2575        dbtx.commit_tx().await;
2576
2577        // Test overwrites work as expected
2578        let mut dbtx = db.begin_transaction().await;
2579        assert_eq!(
2580            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2581            Some(TestVal(2))
2582        );
2583        assert_eq!(
2584            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2585            Some(TestVal(3))
2586        );
2587        dbtx.commit_tx().await;
2588
2589        let mut dbtx = db.begin_transaction().await;
2590        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2591        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2592        dbtx.commit_tx().await;
2593    }
2594
2595    pub async fn verify_remove_nonexisting(db: Database) {
2596        let mut dbtx = db.begin_transaction().await;
2597        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2598        let removed = dbtx.remove_entry(&TestKey(1)).await;
2599        assert!(removed.is_none());
2600
2601        // Commit to suppress the warning message
2602        dbtx.commit_tx().await;
2603    }
2604
2605    pub async fn verify_remove_existing(db: Database) {
2606        let mut dbtx = db.begin_transaction().await;
2607
2608        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2609
2610        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2611
2612        let removed = dbtx.remove_entry(&TestKey(1)).await;
2613        assert_eq!(removed, Some(TestVal(2)));
2614        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2615
2616        // Commit to suppress the warning message
2617        dbtx.commit_tx().await;
2618    }
2619
2620    pub async fn verify_read_own_writes(db: Database) {
2621        let mut dbtx = db.begin_transaction().await;
2622
2623        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2624
2625        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2626
2627        // Commit to suppress the warning message
2628        dbtx.commit_tx().await;
2629    }
2630
2631    pub async fn verify_prevent_dirty_reads(db: Database) {
2632        let mut dbtx = db.begin_transaction().await;
2633
2634        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2635
2636        // dbtx2 should not be able to see uncommitted changes
2637        let mut dbtx2 = db.begin_transaction().await;
2638        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2639
2640        // Commit to suppress the warning message
2641        dbtx.commit_tx().await;
2642    }
2643
2644    pub async fn verify_find_by_range(db: Database) {
2645        let mut dbtx = db.begin_transaction().await;
2646        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2647        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2648        dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2649
2650        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2651        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2652
2653        {
2654            let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2655            module_dbtx
2656                .insert_entry(&TestKey(300), &TestVal(3000))
2657                .await;
2658        }
2659
2660        dbtx.commit_tx().await;
2661
2662        // Verify finding by prefix returns the correct set of key pairs
2663        let mut dbtx = db.begin_transaction_nc().await;
2664
2665        let returned_keys = dbtx
2666            .find_by_range(TestKey(55)..TestKey(56))
2667            .await
2668            .collect::<Vec<_>>()
2669            .await;
2670
2671        let expected = vec![(TestKey(55), TestVal(9999))];
2672
2673        assert_eq!(returned_keys, expected);
2674
2675        let returned_keys = dbtx
2676            .find_by_range(TestKey(54)..TestKey(56))
2677            .await
2678            .collect::<Vec<_>>()
2679            .await;
2680
2681        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2682        assert_eq!(returned_keys, expected);
2683
2684        let returned_keys = dbtx
2685            .find_by_range(TestKey(54)..TestKey(57))
2686            .await
2687            .collect::<Vec<_>>()
2688            .await;
2689
2690        let expected = vec![
2691            (TestKey(54), TestVal(8888)),
2692            (TestKey(55), TestVal(9999)),
2693            (TestKey(56), TestVal(7777)),
2694        ];
2695        assert_eq!(returned_keys, expected);
2696
2697        let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2698        let test_range = module_dbtx
2699            .find_by_range(TestKey(300)..TestKey(301))
2700            .await
2701            .collect::<Vec<_>>()
2702            .await;
2703        assert!(test_range.len() == 1);
2704    }
2705
2706    pub async fn verify_find_by_prefix(db: Database) {
2707        let mut dbtx = db.begin_transaction().await;
2708        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2709        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2710
2711        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2712        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2713        dbtx.commit_tx().await;
2714
2715        // Verify finding by prefix returns the correct set of key pairs
2716        let mut dbtx = db.begin_transaction().await;
2717
2718        let returned_keys = dbtx
2719            .find_by_prefix(&DbPrefixTestPrefix)
2720            .await
2721            .collect::<Vec<_>>()
2722            .await;
2723
2724        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2725        assert_eq!(returned_keys, expected);
2726
2727        let reversed = dbtx
2728            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2729            .await
2730            .collect::<Vec<_>>()
2731            .await;
2732        let mut reversed_expected = expected;
2733        reversed_expected.reverse();
2734        assert_eq!(reversed, reversed_expected);
2735
2736        let returned_keys = dbtx
2737            .find_by_prefix(&AltDbPrefixTestPrefix)
2738            .await
2739            .collect::<Vec<_>>()
2740            .await;
2741
2742        let expected = vec![
2743            (AltTestKey(54), TestVal(6666)),
2744            (AltTestKey(55), TestVal(7777)),
2745        ];
2746        assert_eq!(returned_keys, expected);
2747
2748        let reversed = dbtx
2749            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2750            .await
2751            .collect::<Vec<_>>()
2752            .await;
2753        let mut reversed_expected = expected;
2754        reversed_expected.reverse();
2755        assert_eq!(reversed, reversed_expected);
2756    }
2757
2758    pub async fn verify_commit(db: Database) {
2759        let mut dbtx = db.begin_transaction().await;
2760
2761        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2762        dbtx.commit_tx().await;
2763
2764        // Verify dbtx2 can see committed transactions
2765        let mut dbtx2 = db.begin_transaction().await;
2766        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2767    }
2768
2769    pub async fn verify_rollback_to_savepoint(db: Database) {
2770        let mut dbtx_rollback = db.begin_transaction().await;
2771
2772        dbtx_rollback
2773            .insert_entry(&TestKey(20), &TestVal(2000))
2774            .await;
2775
2776        dbtx_rollback
2777            .set_tx_savepoint()
2778            .await
2779            .expect("Error setting transaction savepoint");
2780
2781        dbtx_rollback
2782            .insert_entry(&TestKey(21), &TestVal(2001))
2783            .await;
2784
2785        assert_eq!(
2786            dbtx_rollback.get_value(&TestKey(20)).await,
2787            Some(TestVal(2000))
2788        );
2789        assert_eq!(
2790            dbtx_rollback.get_value(&TestKey(21)).await,
2791            Some(TestVal(2001))
2792        );
2793
2794        dbtx_rollback
2795            .rollback_tx_to_savepoint()
2796            .await
2797            .expect("Error setting transaction savepoint");
2798
2799        assert_eq!(
2800            dbtx_rollback.get_value(&TestKey(20)).await,
2801            Some(TestVal(2000))
2802        );
2803
2804        assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2805
2806        // Commit to suppress the warning message
2807        dbtx_rollback.commit_tx().await;
2808    }
2809
2810    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2811        let mut dbtx = db.begin_transaction().await;
2812        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2813
2814        let mut dbtx2 = db.begin_transaction().await;
2815
2816        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2817
2818        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2819
2820        dbtx2.commit_tx().await;
2821
2822        // dbtx should still read None because it is operating over a snapshot
2823        // of the data when the transaction started
2824        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2825
2826        let expected_keys = 0;
2827        let returned_keys = dbtx
2828            .find_by_prefix(&DbPrefixTestPrefix)
2829            .await
2830            .fold(0, |returned_keys, (key, value)| async move {
2831                if key == TestKey(100) {
2832                    assert!(value.eq(&TestVal(101)));
2833                }
2834                returned_keys + 1
2835            })
2836            .await;
2837
2838        assert_eq!(returned_keys, expected_keys);
2839    }
2840
2841    pub async fn verify_snapshot_isolation(db: Database) {
2842        async fn random_yield() {
2843            let times = if rand::thread_rng().gen_bool(0.5) {
2844                0
2845            } else {
2846                10
2847            };
2848            for _ in 0..times {
2849                tokio::task::yield_now().await;
2850            }
2851        }
2852
2853        // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
2854        for i in 0..1000 {
2855            let base_key = i * 2;
2856            let tx_accepted_key = base_key;
2857            let spent_input_key = base_key + 1;
2858
2859            join!(
2860                async {
2861                    random_yield().await;
2862                    let mut dbtx = db.begin_transaction().await;
2863
2864                    random_yield().await;
2865                    let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2866                    random_yield().await;
2867                    // we have 4 operations that can give you the db key,
2868                    // try all of them
2869                    let s = match i % 5 {
2870                        0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2871                        1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2872                        2 => {
2873                            dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2874                                .await
2875                        }
2876                        3 => {
2877                            dbtx.find_by_prefix(&DbPrefixTestPrefix)
2878                                .await
2879                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2880                                .map(|(_k, v)| v)
2881                                .next()
2882                                .await
2883                        }
2884                        4 => {
2885                            dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2886                                .await
2887                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2888                                .map(|(_k, v)| v)
2889                                .next()
2890                                .await
2891                        }
2892                        _ => {
2893                            panic!("woot?");
2894                        }
2895                    };
2896
2897                    match (a, s) {
2898                        (None, None) | (Some(_), Some(_)) => {}
2899                        (None, Some(_)) => panic!("none some?! {i}"),
2900                        (Some(_), None) => panic!("some none?! {i}"),
2901                    }
2902                },
2903                async {
2904                    random_yield().await;
2905
2906                    let mut dbtx = db.begin_transaction().await;
2907                    random_yield().await;
2908                    assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2909
2910                    random_yield().await;
2911                    assert_eq!(
2912                        dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2913                            .await,
2914                        None
2915                    );
2916
2917                    random_yield().await;
2918                    assert_eq!(
2919                        dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2920                            .await,
2921                        None
2922                    );
2923                    random_yield().await;
2924                    dbtx.commit_tx().await;
2925                }
2926            );
2927        }
2928    }
2929
2930    pub async fn verify_phantom_entry(db: Database) {
2931        let mut dbtx = db.begin_transaction().await;
2932
2933        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2934
2935        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2936
2937        dbtx.commit_tx().await;
2938
2939        let mut dbtx = db.begin_transaction().await;
2940        let expected_keys = 2;
2941        let returned_keys = dbtx
2942            .find_by_prefix(&DbPrefixTestPrefix)
2943            .await
2944            .fold(0, |returned_keys, (key, value)| async move {
2945                match key {
2946                    TestKey(100) => {
2947                        assert!(value.eq(&TestVal(101)));
2948                    }
2949                    TestKey(101) => {
2950                        assert!(value.eq(&TestVal(102)));
2951                    }
2952                    _ => {}
2953                };
2954                returned_keys + 1
2955            })
2956            .await;
2957
2958        assert_eq!(returned_keys, expected_keys);
2959
2960        let mut dbtx2 = db.begin_transaction().await;
2961
2962        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2963
2964        dbtx2.commit_tx().await;
2965
2966        let returned_keys = dbtx
2967            .find_by_prefix(&DbPrefixTestPrefix)
2968            .await
2969            .fold(0, |returned_keys, (key, value)| async move {
2970                match key {
2971                    TestKey(100) => {
2972                        assert!(value.eq(&TestVal(101)));
2973                    }
2974                    TestKey(101) => {
2975                        assert!(value.eq(&TestVal(102)));
2976                    }
2977                    _ => {}
2978                };
2979                returned_keys + 1
2980            })
2981            .await;
2982
2983        assert_eq!(returned_keys, expected_keys);
2984    }
2985
2986    pub async fn expect_write_conflict(db: Database) {
2987        let mut dbtx = db.begin_transaction().await;
2988        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2989        dbtx.commit_tx().await;
2990
2991        let mut dbtx2 = db.begin_transaction().await;
2992        let mut dbtx3 = db.begin_transaction().await;
2993
2994        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2995
2996        // Depending on if the database implementation supports optimistic or
2997        // pessimistic transactions, this test should generate an error here
2998        // (pessimistic) or at commit time (optimistic)
2999        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3000
3001        dbtx2.commit_tx().await;
3002        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3003    }
3004
3005    pub async fn verify_string_prefix(db: Database) {
3006        let mut dbtx = db.begin_transaction().await;
3007        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3008
3009        assert_eq!(
3010            dbtx.get_value(&PercentTestKey(100)).await,
3011            Some(TestVal(101))
3012        );
3013
3014        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3015
3016        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3017
3018        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3019
3020        // If the wildcard character ('%') is not handled properly, this will make
3021        // find_by_prefix return 5 results instead of 4
3022        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3023
3024        let expected_keys = 4;
3025        let returned_keys = dbtx
3026            .find_by_prefix(&PercentPrefixTestPrefix)
3027            .await
3028            .fold(0, |returned_keys, (key, value)| async move {
3029                if matches!(key, PercentTestKey(101)) {
3030                    assert!(value.eq(&TestVal(100)));
3031                }
3032                returned_keys + 1
3033            })
3034            .await;
3035
3036        assert_eq!(returned_keys, expected_keys);
3037    }
3038
3039    pub async fn verify_remove_by_prefix(db: Database) {
3040        let mut dbtx = db.begin_transaction().await;
3041
3042        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3043
3044        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3045
3046        dbtx.commit_tx().await;
3047
3048        let mut remove_dbtx = db.begin_transaction().await;
3049        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3050        remove_dbtx.commit_tx().await;
3051
3052        let mut dbtx = db.begin_transaction().await;
3053        let expected_keys = 0;
3054        let returned_keys = dbtx
3055            .find_by_prefix(&DbPrefixTestPrefix)
3056            .await
3057            .fold(0, |returned_keys, (key, value)| async move {
3058                match key {
3059                    TestKey(100) => {
3060                        assert!(value.eq(&TestVal(101)));
3061                    }
3062                    TestKey(101) => {
3063                        assert!(value.eq(&TestVal(102)));
3064                    }
3065                    _ => {}
3066                };
3067                returned_keys + 1
3068            })
3069            .await;
3070
3071        assert_eq!(returned_keys, expected_keys);
3072    }
3073
3074    pub async fn verify_module_db(db: Database, module_db: Database) {
3075        let mut dbtx = db.begin_transaction().await;
3076
3077        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3078
3079        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3080
3081        dbtx.commit_tx().await;
3082
3083        // verify module_dbtx can only read key/value pairs from its own module
3084        let mut module_dbtx = module_db.begin_transaction().await;
3085        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3086
3087        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3088
3089        // verify module_dbtx can read key/value pairs that it wrote
3090        let mut dbtx = db.begin_transaction().await;
3091        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3092
3093        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3094
3095        let mut module_dbtx = module_db.begin_transaction().await;
3096
3097        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3098
3099        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3100
3101        module_dbtx.commit_tx().await;
3102
3103        let expected_keys = 2;
3104        let mut dbtx = db.begin_transaction().await;
3105        let returned_keys = dbtx
3106            .find_by_prefix(&DbPrefixTestPrefix)
3107            .await
3108            .fold(0, |returned_keys, (key, value)| async move {
3109                match key {
3110                    TestKey(100) => {
3111                        assert!(value.eq(&TestVal(101)));
3112                    }
3113                    TestKey(101) => {
3114                        assert!(value.eq(&TestVal(102)));
3115                    }
3116                    _ => {}
3117                };
3118                returned_keys + 1
3119            })
3120            .await;
3121
3122        assert_eq!(returned_keys, expected_keys);
3123
3124        let removed = dbtx.remove_entry(&TestKey(100)).await;
3125        assert_eq!(removed, Some(TestVal(101)));
3126        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3127
3128        let mut module_dbtx = module_db.begin_transaction().await;
3129        assert_eq!(
3130            module_dbtx.get_value(&TestKey(100)).await,
3131            Some(TestVal(103))
3132        );
3133    }
3134
3135    pub async fn verify_module_prefix(db: Database) {
3136        let mut test_dbtx = db.begin_transaction().await;
3137        {
3138            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3139
3140            test_module_dbtx
3141                .insert_entry(&TestKey(100), &TestVal(101))
3142                .await;
3143
3144            test_module_dbtx
3145                .insert_entry(&TestKey(101), &TestVal(102))
3146                .await;
3147        }
3148
3149        test_dbtx.commit_tx().await;
3150
3151        let mut alt_dbtx = db.begin_transaction().await;
3152        {
3153            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3154
3155            alt_module_dbtx
3156                .insert_entry(&TestKey(100), &TestVal(103))
3157                .await;
3158
3159            alt_module_dbtx
3160                .insert_entry(&TestKey(101), &TestVal(104))
3161                .await;
3162        }
3163
3164        alt_dbtx.commit_tx().await;
3165
3166        // verify test_module_dbtx can only see key/value pairs from its own module
3167        let mut test_dbtx = db.begin_transaction().await;
3168        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3169        assert_eq!(
3170            test_module_dbtx.get_value(&TestKey(100)).await,
3171            Some(TestVal(101))
3172        );
3173
3174        assert_eq!(
3175            test_module_dbtx.get_value(&TestKey(101)).await,
3176            Some(TestVal(102))
3177        );
3178
3179        let expected_keys = 2;
3180        let returned_keys = test_module_dbtx
3181            .find_by_prefix(&DbPrefixTestPrefix)
3182            .await
3183            .fold(0, |returned_keys, (key, value)| async move {
3184                match key {
3185                    TestKey(100) => {
3186                        assert!(value.eq(&TestVal(101)));
3187                    }
3188                    TestKey(101) => {
3189                        assert!(value.eq(&TestVal(102)));
3190                    }
3191                    _ => {}
3192                };
3193                returned_keys + 1
3194            })
3195            .await;
3196
3197        assert_eq!(returned_keys, expected_keys);
3198
3199        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3200        assert_eq!(removed, Some(TestVal(101)));
3201        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3202
3203        // test_dbtx on its own wont find the key because it does not use a module
3204        // prefix
3205        let mut test_dbtx = db.begin_transaction().await;
3206        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3207
3208        test_dbtx.commit_tx().await;
3209    }
3210
3211    #[cfg(test)]
3212    #[tokio::test]
3213    pub async fn verify_test_migration() {
3214        // Insert a bunch of old dummy data that needs to be migrated to a new version
3215        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3216        let expected_test_keys_size: usize = 100;
3217        let mut dbtx = db.begin_transaction().await;
3218        for i in 0..expected_test_keys_size {
3219            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3220                .await;
3221        }
3222
3223        // Will also be migrated to `DatabaseVersionKey`
3224        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3225            .await;
3226        dbtx.commit_tx().await;
3227
3228        let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
3229
3230        migrations.insert(DatabaseVersion(0), |ctx| {
3231            migrate_test_db_version_0(ctx).boxed()
3232        });
3233
3234        apply_migrations(&db, "TestModule".to_string(), migrations, None, None)
3235            .await
3236            .expect("Error applying migrations for TestModule");
3237
3238        // Verify that the migrations completed successfully
3239        let mut dbtx = db.begin_transaction().await;
3240
3241        // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
3242        // to `DatabaseVersionKey`
3243        assert!(dbtx
3244            .get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3245            .await
3246            .is_some());
3247
3248        // Verify Dummy module migration
3249        let test_keys = dbtx
3250            .find_by_prefix(&DbPrefixTestPrefix)
3251            .await
3252            .collect::<Vec<_>>()
3253            .await;
3254        let test_keys_size = test_keys.len();
3255        assert_eq!(test_keys_size, expected_test_keys_size);
3256        for (key, val) in test_keys {
3257            assert_eq!(key.0, val.0 + 1);
3258        }
3259    }
3260
3261    #[allow(dead_code)]
3262    async fn migrate_test_db_version_0(mut ctx: MigrationContext<'_>) -> Result<(), anyhow::Error> {
3263        let mut dbtx = ctx.dbtx();
3264        let example_keys_v0 = dbtx
3265            .find_by_prefix(&DbPrefixTestPrefixV0)
3266            .await
3267            .collect::<Vec<_>>()
3268            .await;
3269        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3270        for (key, val) in example_keys_v0 {
3271            let key_v2 = TestKey(key.1);
3272            dbtx.insert_new_entry(&key_v2, &val).await;
3273        }
3274        Ok(())
3275    }
3276
3277    #[cfg(test)]
3278    #[tokio::test]
3279    async fn test_autocommit() {
3280        use std::marker::PhantomData;
3281        use std::ops::Range;
3282        use std::path::Path;
3283
3284        use anyhow::anyhow;
3285        use async_trait::async_trait;
3286
3287        use crate::db::{
3288            AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3289            IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3290            IRawDatabaseTransaction,
3291        };
3292        use crate::ModuleDecoderRegistry;
3293
3294        #[derive(Debug)]
3295        struct FakeDatabase;
3296
3297        #[async_trait]
3298        impl IRawDatabase for FakeDatabase {
3299            type Transaction<'a> = FakeTransaction<'a>;
3300            async fn begin_transaction(&self) -> FakeTransaction {
3301                FakeTransaction(PhantomData)
3302            }
3303
3304            fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3305                Ok(())
3306            }
3307        }
3308
3309        #[derive(Debug)]
3310        struct FakeTransaction<'a>(PhantomData<&'a ()>);
3311
3312        #[async_trait]
3313        impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3314            async fn raw_insert_bytes(
3315                &mut self,
3316                _key: &[u8],
3317                _value: &[u8],
3318            ) -> anyhow::Result<Option<Vec<u8>>> {
3319                unimplemented!()
3320            }
3321
3322            async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3323                unimplemented!()
3324            }
3325
3326            async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3327                unimplemented!()
3328            }
3329
3330            async fn raw_find_by_range(
3331                &mut self,
3332                _key_range: Range<&[u8]>,
3333            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3334                unimplemented!()
3335            }
3336
3337            async fn raw_find_by_prefix(
3338                &mut self,
3339                _key_prefix: &[u8],
3340            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3341                unimplemented!()
3342            }
3343
3344            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3345                unimplemented!()
3346            }
3347
3348            async fn raw_find_by_prefix_sorted_descending(
3349                &mut self,
3350                _key_prefix: &[u8],
3351            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3352                unimplemented!()
3353            }
3354        }
3355
3356        #[async_trait]
3357        impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3358            async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3359                unimplemented!()
3360            }
3361
3362            async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3363                unimplemented!()
3364            }
3365        }
3366
3367        #[async_trait]
3368        impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3369            async fn commit_tx(self) -> anyhow::Result<()> {
3370                Err(anyhow!("Can't commit!"))
3371            }
3372        }
3373
3374        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3375        let err = db
3376            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3377            .await
3378            .unwrap_err();
3379
3380        match err {
3381            AutocommitError::CommitFailed {
3382                attempts: failed_attempts,
3383                ..
3384            } => {
3385                assert_eq!(failed_attempts, 5);
3386            }
3387            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3388        }
3389    }
3390}
3391
3392pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3393    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3394    decoders: ModuleDecoderRegistry,
3395    key_prefix: &KP,
3396) -> impl Stream<
3397    Item = (
3398        KP::Record,
3399        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3400    ),
3401> + 'r
3402where
3403    'inner: 'r,
3404    KP: DatabaseLookup,
3405    KP::Record: DatabaseKey,
3406{
3407    debug!(target: LOG_DB, "find by prefix sorted descending");
3408    let prefix_bytes = key_prefix.to_bytes();
3409    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3410        .await
3411        .expect("Error doing prefix search in database")
3412        .map(move |(key_bytes, value_bytes)| {
3413            let key = decode_key_expect(&key_bytes, &decoders);
3414            let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3415            (key, value)
3416        })
3417}
3418
3419#[cfg(test)]
3420mod tests {
3421    use tokio::sync::oneshot;
3422
3423    use super::mem_impl::MemDatabase;
3424    use super::*;
3425    use crate::runtime::spawn;
3426
3427    async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> {
3428        let db = db.clone();
3429        let (tx, rx) = oneshot::channel::<()>();
3430        let join_handle = spawn("wait key exists", async move {
3431            let sub = db.wait_key_exists(&key);
3432            tx.send(()).unwrap();
3433            sub.await
3434        });
3435        rx.await.unwrap();
3436        join_handle
3437    }
3438
3439    #[tokio::test]
3440    async fn test_wait_key_before_transaction() {
3441        let key = TestKey(1);
3442        let val = TestVal(2);
3443        let db = MemDatabase::new().into_database();
3444
3445        let key_task = waiter(&db, TestKey(1)).await;
3446
3447        let mut tx = db.begin_transaction().await;
3448        tx.insert_new_entry(&key, &val).await;
3449        tx.commit_tx().await;
3450
3451        assert_eq!(
3452            future_returns_shortly(async { key_task.await.unwrap() }).await,
3453            Some(TestVal(2)),
3454            "should notify"
3455        );
3456    }
3457
3458    #[tokio::test]
3459    async fn test_wait_key_before_insert() {
3460        let key = TestKey(1);
3461        let val = TestVal(2);
3462        let db = MemDatabase::new().into_database();
3463
3464        let mut tx = db.begin_transaction().await;
3465        let key_task = waiter(&db, TestKey(1)).await;
3466        tx.insert_new_entry(&key, &val).await;
3467        tx.commit_tx().await;
3468
3469        assert_eq!(
3470            future_returns_shortly(async { key_task.await.unwrap() }).await,
3471            Some(TestVal(2)),
3472            "should notify"
3473        );
3474    }
3475
3476    #[tokio::test]
3477    async fn test_wait_key_after_insert() {
3478        let key = TestKey(1);
3479        let val = TestVal(2);
3480        let db = MemDatabase::new().into_database();
3481
3482        let mut tx = db.begin_transaction().await;
3483        tx.insert_new_entry(&key, &val).await;
3484
3485        let key_task = waiter(&db, TestKey(1)).await;
3486
3487        tx.commit_tx().await;
3488
3489        assert_eq!(
3490            future_returns_shortly(async { key_task.await.unwrap() }).await,
3491            Some(TestVal(2)),
3492            "should notify"
3493        );
3494    }
3495
3496    #[tokio::test]
3497    async fn test_wait_key_after_commit() {
3498        let key = TestKey(1);
3499        let val = TestVal(2);
3500        let db = MemDatabase::new().into_database();
3501
3502        let mut tx = db.begin_transaction().await;
3503        tx.insert_new_entry(&key, &val).await;
3504        tx.commit_tx().await;
3505
3506        let key_task = waiter(&db, TestKey(1)).await;
3507        assert_eq!(
3508            future_returns_shortly(async { key_task.await.unwrap() }).await,
3509            Some(TestVal(2)),
3510            "should notify"
3511        );
3512    }
3513
3514    #[tokio::test]
3515    async fn test_wait_key_isolated_db() {
3516        let module_instance_id = 10;
3517        let key = TestKey(1);
3518        let val = TestVal(2);
3519        let db = MemDatabase::new().into_database();
3520        let db = db.with_prefix_module_id(module_instance_id).0;
3521
3522        let key_task = waiter(&db, TestKey(1)).await;
3523
3524        let mut tx = db.begin_transaction().await;
3525        tx.insert_new_entry(&key, &val).await;
3526        tx.commit_tx().await;
3527
3528        assert_eq!(
3529            future_returns_shortly(async { key_task.await.unwrap() }).await,
3530            Some(TestVal(2)),
3531            "should notify"
3532        );
3533    }
3534
3535    #[tokio::test]
3536    async fn test_wait_key_isolated_tx() {
3537        let module_instance_id = 10;
3538        let key = TestKey(1);
3539        let val = TestVal(2);
3540        let db = MemDatabase::new().into_database();
3541
3542        let key_task = waiter(&db.with_prefix_module_id(module_instance_id).0, TestKey(1)).await;
3543
3544        let mut tx = db.begin_transaction().await;
3545        let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id).0;
3546        tx_mod.insert_new_entry(&key, &val).await;
3547        drop(tx_mod);
3548        tx.commit_tx().await;
3549
3550        assert_eq!(
3551            future_returns_shortly(async { key_task.await.unwrap() }).await,
3552            Some(TestVal(2)),
3553            "should notify"
3554        );
3555    }
3556
3557    #[tokio::test]
3558    async fn test_wait_key_no_transaction() {
3559        let db = MemDatabase::new().into_database();
3560
3561        let key_task = waiter(&db, TestKey(1)).await;
3562        assert_eq!(
3563            future_returns_shortly(async { key_task.await.unwrap() }).await,
3564            None,
3565            "should not notify"
3566        );
3567    }
3568
3569    #[tokio::test]
3570    async fn test_prefix_global_dbtx() {
3571        let module_instance_id = 10;
3572        let db = MemDatabase::new().into_database();
3573
3574        {
3575            // Plain module id prefix, can use `global_dbtx` to access global_dbtx
3576            let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3577
3578            let mut tx = db.begin_transaction().await;
3579            let mut tx = tx.global_dbtx(access_token);
3580            tx.insert_new_entry(&TestKey(1), &TestVal(1)).await;
3581            tx.commit_tx().await;
3582        }
3583
3584        assert_eq!(
3585            db.begin_transaction_nc().await.get_value(&TestKey(1)).await,
3586            Some(TestVal(1))
3587        );
3588
3589        {
3590            // Additional non-module inner prefix, does not interfere with `global_dbtx`
3591            let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3592
3593            let db = db.with_prefix(vec![3, 4]);
3594
3595            let mut tx = db.begin_transaction().await;
3596            let mut tx = tx.global_dbtx(access_token);
3597            tx.insert_new_entry(&TestKey(2), &TestVal(2)).await;
3598            tx.commit_tx().await;
3599        }
3600
3601        assert_eq!(
3602            db.begin_transaction_nc().await.get_value(&TestKey(2)).await,
3603            Some(TestVal(2))
3604        );
3605    }
3606
3607    #[tokio::test]
3608    #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3609    async fn test_prefix_global_dbtx_panics_on_global_db() {
3610        let db = MemDatabase::new().into_database();
3611
3612        let mut tx = db.begin_transaction().await;
3613        let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3614    }
3615
3616    #[tokio::test]
3617    #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3618    async fn test_prefix_global_dbtx_panics_on_non_module_prefix() {
3619        let db = MemDatabase::new().into_database();
3620
3621        let prefix = vec![3, 4];
3622        let db = db.with_prefix(prefix.clone());
3623
3624        let mut tx = db.begin_transaction().await;
3625        let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&prefix));
3626    }
3627
3628    #[tokio::test]
3629    #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3630    async fn test_prefix_global_dbtx_panics_on_wrong_access_token() {
3631        let db = MemDatabase::new().into_database();
3632
3633        let prefix = vec![3, 4];
3634        let db = db.with_prefix(prefix.clone());
3635
3636        let mut tx = db.begin_transaction().await;
3637        let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3638    }
3639}