1use std::any;
106use std::collections::BTreeMap;
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{bail, Context, Result};
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use futures::{Stream, StreamExt};
120use macro_rules_attribute::apply;
121use rand::Rng;
122use serde::Serialize;
123use strum_macros::EnumIter;
124use thiserror::Error;
125use tracing::{debug, error, info, instrument, trace, warn};
126
127use crate::core::ModuleInstanceId;
128use crate::encoding::{Decodable, Encodable};
129use crate::fmt_utils::AbbreviateHexBytes;
130use crate::task::{MaybeSend, MaybeSync};
131use crate::util::FmtCompactAnyhow as _;
132use crate::{async_trait_maybe_send, maybe_add_send, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144pub trait DatabaseKeyPrefix: Debug {
145 fn to_bytes(&self) -> Vec<u8>;
146}
147
148pub trait DatabaseRecord: DatabaseKeyPrefix {
151 const DB_PREFIX: u8;
152 const NOTIFY_ON_MODIFY: bool = false;
153 type Key: DatabaseKey + Debug;
154 type Value: DatabaseValue + Debug;
155}
156
157pub trait DatabaseLookup: DatabaseKeyPrefix {
160 type Record: DatabaseRecord;
161}
162
163impl<Record> DatabaseLookup for Record
165where
166 Record: DatabaseRecord + Debug + Decodable + Encodable,
167{
168 type Record = Record;
169}
170
171pub trait DatabaseKey: Sized {
174 const NOTIFY_ON_MODIFY: bool = false;
182 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
183}
184
185pub trait DatabaseKeyWithNotify {}
187
188pub trait DatabaseValue: Sized + Debug {
190 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
191 fn to_bytes(&self) -> Vec<u8>;
192}
193
194pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
195
196pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
200
201#[derive(Debug, Error)]
203pub enum AutocommitError<E> {
204 #[error("Commit Failed: {last_error}")]
206 CommitFailed {
207 attempts: usize,
209 last_error: anyhow::Error,
211 },
212 #[error("Closure error: {error}")]
215 ClosureError {
216 attempts: usize,
222 error: E,
224 },
225}
226
227#[apply(async_trait_maybe_send!)]
236pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
237 type Transaction<'a>: IRawDatabaseTransaction + Debug;
239
240 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
242
243 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
245}
246
247#[apply(async_trait_maybe_send!)]
248impl<T> IRawDatabase for Box<T>
249where
250 T: IRawDatabase,
251{
252 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
253
254 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
255 (**self).begin_transaction().await
256 }
257
258 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
259 (**self).checkpoint(backup_path)
260 }
261}
262
263pub trait IRawDatabaseExt: IRawDatabase + Sized {
265 fn into_database(self) -> Database {
269 Database::new(self, ModuleRegistry::default())
270 }
271}
272
273impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
274
275impl<T> From<T> for Database
276where
277 T: IRawDatabase,
278{
279 fn from(raw: T) -> Self {
280 Self::new(raw, ModuleRegistry::default())
281 }
282}
283
284#[apply(async_trait_maybe_send!)]
287pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
288 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
290 async fn register(&self, key: &[u8]);
292 async fn notify(&self, key: &[u8]);
294
295 fn is_global(&self) -> bool;
298
299 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
301}
302
303#[apply(async_trait_maybe_send!)]
304impl<T> IDatabase for Arc<T>
305where
306 T: IDatabase + ?Sized,
307{
308 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
309 (**self).begin_transaction().await
310 }
311 async fn register(&self, key: &[u8]) {
312 (**self).register(key).await;
313 }
314 async fn notify(&self, key: &[u8]) {
315 (**self).notify(key).await;
316 }
317
318 fn is_global(&self) -> bool {
319 (**self).is_global()
320 }
321
322 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
323 (**self).checkpoint(backup_path)
324 }
325}
326
327struct BaseDatabase<RawDatabase> {
331 notifications: Arc<Notifications>,
332 raw: RawDatabase,
333}
334
335impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337 f.write_str("BaseDatabase")
338 }
339}
340
341#[apply(async_trait_maybe_send!)]
342impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
343 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
344 Box::new(BaseDatabaseTransaction::new(
345 self.raw.begin_transaction().await,
346 self.notifications.clone(),
347 ))
348 }
349 async fn register(&self, key: &[u8]) {
350 self.notifications.register(key).await;
351 }
352 async fn notify(&self, key: &[u8]) {
353 self.notifications.notify(key);
354 }
355
356 fn is_global(&self) -> bool {
357 true
358 }
359
360 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
361 self.raw.checkpoint(backup_path)
362 }
363}
364
365#[derive(Clone, Debug)]
371pub struct Database {
372 inner: Arc<dyn IDatabase + 'static>,
373 module_decoders: ModuleDecoderRegistry,
374}
375
376impl Database {
377 pub fn strong_count(&self) -> usize {
378 Arc::strong_count(&self.inner)
379 }
380
381 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
382 self.inner
383 }
384}
385
386impl Database {
387 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
392 let inner = BaseDatabase {
393 raw,
394 notifications: Arc::new(Notifications::new()),
395 };
396 Self::new_from_arc(
397 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
398 module_decoders,
399 )
400 }
401
402 pub fn new_from_arc(
404 inner: Arc<dyn IDatabase + 'static>,
405 module_decoders: ModuleDecoderRegistry,
406 ) -> Self {
407 Self {
408 inner,
409 module_decoders,
410 }
411 }
412
413 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
415 Self {
416 inner: Arc::new(PrefixDatabase {
417 inner: self.inner.clone(),
418 global_dbtx_access_token: None,
419 prefix,
420 }),
421 module_decoders: self.module_decoders.clone(),
422 }
423 }
424
425 pub fn with_prefix_module_id(
429 &self,
430 module_instance_id: ModuleInstanceId,
431 ) -> (Self, GlobalDBTxAccessToken) {
432 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
433 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
434 (
435 Self {
436 inner: Arc::new(PrefixDatabase {
437 inner: self.inner.clone(),
438 global_dbtx_access_token: Some(global_dbtx_access_token),
439 prefix,
440 }),
441 module_decoders: self.module_decoders.clone(),
442 },
443 global_dbtx_access_token,
444 )
445 }
446
447 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
448 Self {
449 inner: self.inner.clone(),
450 module_decoders,
451 }
452 }
453
454 pub fn is_global(&self) -> bool {
456 self.inner.is_global()
457 }
458
459 pub fn ensure_global(&self) -> Result<()> {
461 if !self.is_global() {
462 bail!("Database instance not global");
463 }
464
465 Ok(())
466 }
467
468 pub fn ensure_isolated(&self) -> Result<()> {
470 if self.is_global() {
471 bail!("Database instance not isolated");
472 }
473
474 Ok(())
475 }
476
477 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
479 where
480 's: 'tx,
481 {
482 DatabaseTransaction::<Committable>::new(
483 self.inner.begin_transaction().await,
484 self.module_decoders.clone(),
485 )
486 }
487
488 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
490 where
491 's: 'tx,
492 {
493 self.begin_transaction().await.into_nc()
494 }
495
496 pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
497 self.inner.checkpoint(backup_path)
498 }
499
500 pub async fn autocommit<'s, 'dbtx, F, T, E>(
528 &'s self,
529 tx_fn: F,
530 max_attempts: Option<usize>,
531 ) -> Result<T, AutocommitError<E>>
532 where
533 's: 'dbtx,
534 for<'r, 'o> F: Fn(
535 &'r mut DatabaseTransaction<'o>,
536 PhantomBound<'dbtx, 'o>,
537 ) -> BoxFuture<'r, Result<T, E>>,
538 {
539 assert_ne!(max_attempts, Some(0));
540 let mut curr_attempts: usize = 0;
541
542 loop {
543 curr_attempts = curr_attempts
548 .checked_add(1)
549 .expect("db autocommit attempt counter overflowed");
550
551 let mut dbtx = self.begin_transaction().await;
552
553 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
554 let val = match tx_fn_res {
555 Ok(val) => val,
556 Err(err) => {
557 dbtx.ignore_uncommitted();
558 return Err(AutocommitError::ClosureError {
559 attempts: curr_attempts,
560 error: err,
561 });
562 }
563 };
564
565 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
566
567 match dbtx.commit_tx_result().await {
568 Ok(()) => {
569 return Ok(val);
570 }
571 Err(err) => {
572 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
573 warn!(
574 target: LOG_DB,
575 curr_attempts,
576 ?err,
577 "Database commit failed in an autocommit block - terminating"
578 );
579 return Err(AutocommitError::CommitFailed {
580 attempts: curr_attempts,
581 last_error: err,
582 });
583 }
584
585 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
586 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
587 warn!(
588 target: LOG_DB,
589 curr_attempts,
590 err = %err.fmt_compact_anyhow(),
591 delay_ms = %delay,
592 "Database commit failed in an autocommit block - retrying"
593 );
594 crate::runtime::sleep(Duration::from_millis(delay)).await;
595 }
596 }
597 }
598 }
599
600 pub async fn wait_key_check<'a, K, T>(
605 &'a self,
606 key: &K,
607 checker: impl Fn(Option<K::Value>) -> Option<T>,
608 ) -> (T, DatabaseTransaction<'a, Committable>)
609 where
610 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
611 {
612 let key_bytes = key.to_bytes();
613 loop {
614 let notify = self.inner.register(&key_bytes);
616
617 let mut tx = self.inner.begin_transaction().await;
619
620 let maybe_value_bytes = tx
621 .raw_get_bytes(&key_bytes)
622 .await
623 .expect("Unrecoverable error when reading from database")
624 .map(|value_bytes| {
625 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
626 });
627
628 if let Some(value) = checker(maybe_value_bytes) {
629 return (
630 value,
631 DatabaseTransaction::new(tx, self.module_decoders.clone()),
632 );
633 }
634
635 notify.await;
637 }
640 }
641
642 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
644 where
645 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
646 {
647 self.wait_key_check(key, std::convert::identity).await.0
648 }
649}
650
651fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
652 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
653 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
654 bytes
655}
656
657#[derive(Clone, Debug)]
660struct PrefixDatabase<Inner>
661where
662 Inner: Debug,
663{
664 prefix: Vec<u8>,
665 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
666 inner: Inner,
667}
668
669impl<Inner> PrefixDatabase<Inner>
670where
671 Inner: Debug,
672{
673 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
677 let mut full_key = self.prefix.clone();
678 full_key.extend_from_slice(key);
679 full_key
680 }
681}
682
683#[apply(async_trait_maybe_send!)]
684impl<Inner> IDatabase for PrefixDatabase<Inner>
685where
686 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
687{
688 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
689 Box::new(PrefixDatabaseTransaction {
690 inner: self.inner.begin_transaction().await,
691 global_dbtx_access_token: self.global_dbtx_access_token,
692 prefix: self.prefix.clone(),
693 })
694 }
695 async fn register(&self, key: &[u8]) {
696 self.inner.register(&self.get_full_key(key)).await;
697 }
698
699 async fn notify(&self, key: &[u8]) {
700 self.inner.notify(&self.get_full_key(key)).await;
701 }
702
703 fn is_global(&self) -> bool {
704 if self.global_dbtx_access_token.is_some() {
705 false
706 } else {
707 self.inner.is_global()
708 }
709 }
710
711 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
712 self.inner.checkpoint(backup_path)
713 }
714}
715
716#[derive(Debug)]
721struct PrefixDatabaseTransaction<Inner> {
722 inner: Inner,
723 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
724 prefix: Vec<u8>,
725}
726
727impl<Inner> PrefixDatabaseTransaction<Inner> {
728 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
732 let mut full_key = self.prefix.clone();
733 full_key.extend_from_slice(key);
734 full_key
735 }
736
737 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
738 Range {
739 start: self.get_full_key(range.start),
740 end: self.get_full_key(range.end),
741 }
742 }
743
744 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
745 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
746 }
747}
748
749#[apply(async_trait_maybe_send!)]
750impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
751where
752 Inner: IDatabaseTransaction,
753{
754 async fn commit_tx(&mut self) -> Result<()> {
755 self.inner.commit_tx().await
756 }
757
758 fn is_global(&self) -> bool {
759 if self.global_dbtx_access_token.is_some() {
760 false
761 } else {
762 self.inner.is_global()
763 }
764 }
765
766 fn global_dbtx(
767 &mut self,
768 access_token: GlobalDBTxAccessToken,
769 ) -> &mut dyn IDatabaseTransaction {
770 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
771 assert_eq!(
772 access_token, self_global_dbtx_access_token,
773 "Invalid access key used to access global_dbtx"
774 );
775 &mut self.inner
776 } else {
777 self.inner.global_dbtx(access_token)
778 }
779 }
780}
781
782#[apply(async_trait_maybe_send!)]
783impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
784where
785 Inner: IDatabaseTransactionOpsCore,
786{
787 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
788 let key = self.get_full_key(key);
789 self.inner.raw_insert_bytes(&key, value).await
790 }
791
792 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
793 let key = self.get_full_key(key);
794 self.inner.raw_get_bytes(&key).await
795 }
796
797 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
798 let key = self.get_full_key(key);
799 self.inner.raw_remove_entry(&key).await
800 }
801
802 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
803 let key = self.get_full_key(key_prefix);
804 let stream = self.inner.raw_find_by_prefix(&key).await?;
805 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
806 }
807
808 async fn raw_find_by_prefix_sorted_descending(
809 &mut self,
810 key_prefix: &[u8],
811 ) -> Result<PrefixStream<'_>> {
812 let key = self.get_full_key(key_prefix);
813 let stream = self
814 .inner
815 .raw_find_by_prefix_sorted_descending(&key)
816 .await?;
817 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
818 }
819
820 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
821 let range = self.get_full_range(range);
822 let stream = self
823 .inner
824 .raw_find_by_range(Range {
825 start: &range.start,
826 end: &range.end,
827 })
828 .await?;
829 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
830 }
831
832 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
833 let key = self.get_full_key(key_prefix);
834 self.inner.raw_remove_by_prefix(&key).await
835 }
836}
837
838#[apply(async_trait_maybe_send!)]
839impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
840where
841 Inner: IDatabaseTransactionOps,
842{
843 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
844 self.inner.rollback_tx_to_savepoint().await
845 }
846
847 async fn set_tx_savepoint(&mut self) -> Result<()> {
848 self.set_tx_savepoint().await
849 }
850}
851
852#[apply(async_trait_maybe_send!)]
856pub trait IDatabaseTransactionOpsCore: MaybeSend {
857 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
859
860 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
862
863 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
865
866 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
869
870 async fn raw_find_by_prefix_sorted_descending(
872 &mut self,
873 key_prefix: &[u8],
874 ) -> Result<PrefixStream<'_>>;
875
876 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
880
881 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
883}
884
885#[apply(async_trait_maybe_send!)]
886impl<T> IDatabaseTransactionOpsCore for Box<T>
887where
888 T: IDatabaseTransactionOpsCore + ?Sized,
889{
890 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
891 (**self).raw_insert_bytes(key, value).await
892 }
893
894 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
895 (**self).raw_get_bytes(key).await
896 }
897
898 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
899 (**self).raw_remove_entry(key).await
900 }
901
902 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
903 (**self).raw_find_by_prefix(key_prefix).await
904 }
905
906 async fn raw_find_by_prefix_sorted_descending(
907 &mut self,
908 key_prefix: &[u8],
909 ) -> Result<PrefixStream<'_>> {
910 (**self)
911 .raw_find_by_prefix_sorted_descending(key_prefix)
912 .await
913 }
914
915 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
916 (**self).raw_find_by_range(range).await
917 }
918
919 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
920 (**self).raw_remove_by_prefix(key_prefix).await
921 }
922}
923
924#[apply(async_trait_maybe_send!)]
925impl<T> IDatabaseTransactionOpsCore for &mut T
926where
927 T: IDatabaseTransactionOpsCore + ?Sized,
928{
929 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
930 (**self).raw_insert_bytes(key, value).await
931 }
932
933 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
934 (**self).raw_get_bytes(key).await
935 }
936
937 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
938 (**self).raw_remove_entry(key).await
939 }
940
941 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
942 (**self).raw_find_by_prefix(key_prefix).await
943 }
944
945 async fn raw_find_by_prefix_sorted_descending(
946 &mut self,
947 key_prefix: &[u8],
948 ) -> Result<PrefixStream<'_>> {
949 (**self)
950 .raw_find_by_prefix_sorted_descending(key_prefix)
951 .await
952 }
953
954 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
955 (**self).raw_find_by_range(range).await
956 }
957
958 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
959 (**self).raw_remove_by_prefix(key_prefix).await
960 }
961}
962
963#[apply(async_trait_maybe_send!)]
969pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
970 async fn set_tx_savepoint(&mut self) -> Result<()>;
979
980 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
981}
982
983#[apply(async_trait_maybe_send!)]
984impl<T> IDatabaseTransactionOps for Box<T>
985where
986 T: IDatabaseTransactionOps + ?Sized,
987{
988 async fn set_tx_savepoint(&mut self) -> Result<()> {
989 (**self).set_tx_savepoint().await
990 }
991
992 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
993 (**self).rollback_tx_to_savepoint().await
994 }
995}
996
997#[apply(async_trait_maybe_send!)]
998impl<T> IDatabaseTransactionOps for &mut T
999where
1000 T: IDatabaseTransactionOps + ?Sized,
1001{
1002 async fn set_tx_savepoint(&mut self) -> Result<()> {
1003 (**self).set_tx_savepoint().await
1004 }
1005
1006 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1007 (**self).rollback_tx_to_savepoint().await
1008 }
1009}
1010
1011#[apply(async_trait_maybe_send!)]
1017pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1018 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1019 where
1020 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1021
1022 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1023 where
1024 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1025 K::Value: MaybeSend + MaybeSync;
1026
1027 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1028 where
1029 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1030 K::Value: MaybeSend + MaybeSync;
1031
1032 async fn find_by_range<K>(
1033 &mut self,
1034 key_range: Range<K>,
1035 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1036 where
1037 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1038 K::Value: MaybeSend + MaybeSync;
1039
1040 async fn find_by_prefix<KP>(
1041 &mut self,
1042 key_prefix: &KP,
1043 ) -> Pin<
1044 Box<
1045 maybe_add_send!(
1046 dyn Stream<
1047 Item = (
1048 KP::Record,
1049 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1050 ),
1051 > + '_
1052 ),
1053 >,
1054 >
1055 where
1056 KP: DatabaseLookup + MaybeSend + MaybeSync,
1057 KP::Record: DatabaseKey;
1058
1059 async fn find_by_prefix_sorted_descending<KP>(
1060 &mut self,
1061 key_prefix: &KP,
1062 ) -> Pin<
1063 Box<
1064 maybe_add_send!(
1065 dyn Stream<
1066 Item = (
1067 KP::Record,
1068 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1069 ),
1070 > + '_
1071 ),
1072 >,
1073 >
1074 where
1075 KP: DatabaseLookup + MaybeSend + MaybeSync,
1076 KP::Record: DatabaseKey;
1077
1078 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1079 where
1080 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1081
1082 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1083 where
1084 KP: DatabaseLookup + MaybeSend + MaybeSync;
1085}
1086
1087#[apply(async_trait_maybe_send!)]
1090impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1091where
1092 T: IDatabaseTransactionOpsCore + WithDecoders,
1093{
1094 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1095 where
1096 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1097 {
1098 let key_bytes = key.to_bytes();
1099 let raw = self
1100 .raw_get_bytes(&key_bytes)
1101 .await
1102 .expect("Unrecoverable error occurred while reading and entry from the database");
1103 raw.map(|value_bytes| {
1104 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1105 })
1106 }
1107
1108 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1109 where
1110 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1111 K::Value: MaybeSend + MaybeSync,
1112 {
1113 let key_bytes = key.to_bytes();
1114 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1115 .await
1116 .expect("Unrecoverable error occurred while inserting entry into the database")
1117 .map(|value_bytes| {
1118 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1119 })
1120 }
1121
1122 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1123 where
1124 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1125 K::Value: MaybeSend + MaybeSync,
1126 {
1127 if let Some(prev) = self.insert_entry(key, value).await {
1128 panic!(
1129 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1130 );
1131 }
1132 }
1133
1134 async fn find_by_range<K>(
1135 &mut self,
1136 key_range: Range<K>,
1137 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1138 where
1139 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1140 K::Value: MaybeSend + MaybeSync,
1141 {
1142 let decoders = self.decoders().clone();
1143 Box::pin(
1144 self.raw_find_by_range(Range {
1145 start: &key_range.start.to_bytes(),
1146 end: &key_range.end.to_bytes(),
1147 })
1148 .await
1149 .expect("Unrecoverable error occurred while listing entries from the database")
1150 .map(move |(key_bytes, value_bytes)| {
1151 let key = decode_key_expect(&key_bytes, &decoders);
1152 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1153 (key, value)
1154 }),
1155 )
1156 }
1157
1158 async fn find_by_prefix<KP>(
1159 &mut self,
1160 key_prefix: &KP,
1161 ) -> Pin<
1162 Box<
1163 maybe_add_send!(
1164 dyn Stream<
1165 Item = (
1166 KP::Record,
1167 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1168 ),
1169 > + '_
1170 ),
1171 >,
1172 >
1173 where
1174 KP: DatabaseLookup + MaybeSend + MaybeSync,
1175 KP::Record: DatabaseKey,
1176 {
1177 let decoders = self.decoders().clone();
1178 Box::pin(
1179 self.raw_find_by_prefix(&key_prefix.to_bytes())
1180 .await
1181 .expect("Unrecoverable error occurred while listing entries from the database")
1182 .map(move |(key_bytes, value_bytes)| {
1183 let key = decode_key_expect(&key_bytes, &decoders);
1184 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1185 (key, value)
1186 }),
1187 )
1188 }
1189
1190 async fn find_by_prefix_sorted_descending<KP>(
1191 &mut self,
1192 key_prefix: &KP,
1193 ) -> Pin<
1194 Box<
1195 maybe_add_send!(
1196 dyn Stream<
1197 Item = (
1198 KP::Record,
1199 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1200 ),
1201 > + '_
1202 ),
1203 >,
1204 >
1205 where
1206 KP: DatabaseLookup + MaybeSend + MaybeSync,
1207 KP::Record: DatabaseKey,
1208 {
1209 let decoders = self.decoders().clone();
1210 Box::pin(
1211 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1212 .await
1213 .expect("Unrecoverable error occurred while listing entries from the database")
1214 .map(move |(key_bytes, value_bytes)| {
1215 let key = decode_key_expect(&key_bytes, &decoders);
1216 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1217 (key, value)
1218 }),
1219 )
1220 }
1221 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1222 where
1223 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1224 {
1225 let key_bytes = key.to_bytes();
1226 self.raw_remove_entry(&key_bytes)
1227 .await
1228 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1229 .map(|value_bytes| {
1230 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1231 })
1232 }
1233 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1234 where
1235 KP: DatabaseLookup + MaybeSend + MaybeSync,
1236 {
1237 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1238 .await
1239 .expect("Unrecoverable error when removing entries from the database");
1240 }
1241}
1242
1243pub trait WithDecoders {
1246 fn decoders(&self) -> &ModuleDecoderRegistry;
1247}
1248
1249#[apply(async_trait_maybe_send!)]
1251pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1252 async fn commit_tx(self) -> Result<()>;
1253}
1254
1255#[apply(async_trait_maybe_send!)]
1259pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1260 async fn commit_tx(&mut self) -> Result<()>;
1262
1263 fn is_global(&self) -> bool;
1265
1266 #[doc(hidden)]
1271 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1272 -> &mut dyn IDatabaseTransaction;
1273}
1274
1275#[apply(async_trait_maybe_send!)]
1276impl<T> IDatabaseTransaction for Box<T>
1277where
1278 T: IDatabaseTransaction + ?Sized,
1279{
1280 async fn commit_tx(&mut self) -> Result<()> {
1281 (**self).commit_tx().await
1282 }
1283
1284 fn is_global(&self) -> bool {
1285 (**self).is_global()
1286 }
1287
1288 fn global_dbtx(
1289 &mut self,
1290 access_token: GlobalDBTxAccessToken,
1291 ) -> &mut dyn IDatabaseTransaction {
1292 (**self).global_dbtx(access_token)
1293 }
1294}
1295
1296#[apply(async_trait_maybe_send!)]
1297impl<'a, T> IDatabaseTransaction for &'a mut T
1298where
1299 T: IDatabaseTransaction + ?Sized,
1300{
1301 async fn commit_tx(&mut self) -> Result<()> {
1302 (**self).commit_tx().await
1303 }
1304
1305 fn is_global(&self) -> bool {
1306 (**self).is_global()
1307 }
1308
1309 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1310 (**self).global_dbtx(access_key)
1311 }
1312}
1313
1314struct BaseDatabaseTransaction<Tx> {
1317 raw: Option<Tx>,
1319 notify_queue: Option<NotifyQueue>,
1320 notifications: Arc<Notifications>,
1321}
1322
1323impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1324where
1325 Tx: fmt::Debug,
1326{
1327 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1328 f.write_fmt(format_args!(
1329 "BaseDatabaseTransaction{{ raw={:?} }}",
1330 self.raw
1331 ))
1332 }
1333}
1334impl<Tx> BaseDatabaseTransaction<Tx>
1335where
1336 Tx: IRawDatabaseTransaction,
1337{
1338 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1339 Self {
1340 raw: Some(dbtx),
1341 notifications,
1342 notify_queue: Some(NotifyQueue::new()),
1343 }
1344 }
1345
1346 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1347 self.notify_queue
1348 .as_mut()
1349 .context("can not call add_notification_key after commit")?
1350 .add(&key);
1351 Ok(())
1352 }
1353}
1354
1355#[apply(async_trait_maybe_send!)]
1356impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1357 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1358 self.add_notification_key(key)?;
1359 self.raw
1360 .as_mut()
1361 .context("Cannot insert into already consumed transaction")?
1362 .raw_insert_bytes(key, value)
1363 .await
1364 }
1365
1366 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1367 self.raw
1368 .as_mut()
1369 .context("Cannot retrieve from already consumed transaction")?
1370 .raw_get_bytes(key)
1371 .await
1372 }
1373
1374 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1375 self.add_notification_key(key)?;
1376 self.raw
1377 .as_mut()
1378 .context("Cannot remove from already consumed transaction")?
1379 .raw_remove_entry(key)
1380 .await
1381 }
1382
1383 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1384 self.raw
1385 .as_mut()
1386 .context("Cannot retrieve from already consumed transaction")?
1387 .raw_find_by_range(key_range)
1388 .await
1389 }
1390
1391 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1392 self.raw
1393 .as_mut()
1394 .context("Cannot retrieve from already consumed transaction")?
1395 .raw_find_by_prefix(key_prefix)
1396 .await
1397 }
1398
1399 async fn raw_find_by_prefix_sorted_descending(
1400 &mut self,
1401 key_prefix: &[u8],
1402 ) -> Result<PrefixStream<'_>> {
1403 self.raw
1404 .as_mut()
1405 .context("Cannot retrieve from already consumed transaction")?
1406 .raw_find_by_prefix_sorted_descending(key_prefix)
1407 .await
1408 }
1409
1410 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1411 self.raw
1412 .as_mut()
1413 .context("Cannot remove from already consumed transaction")?
1414 .raw_remove_by_prefix(key_prefix)
1415 .await
1416 }
1417}
1418
1419#[apply(async_trait_maybe_send!)]
1420impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1421 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1422 self.raw
1423 .as_mut()
1424 .context("Cannot rollback to a savepoint on an already consumed transaction")?
1425 .rollback_tx_to_savepoint()
1426 .await?;
1427 Ok(())
1428 }
1429
1430 async fn set_tx_savepoint(&mut self) -> Result<()> {
1431 self.raw
1432 .as_mut()
1433 .context("Cannot set a tx savepoint on an already consumed transaction")?
1434 .set_tx_savepoint()
1435 .await?;
1436 Ok(())
1437 }
1438}
1439
1440#[apply(async_trait_maybe_send!)]
1441impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1442 for BaseDatabaseTransaction<Tx>
1443{
1444 async fn commit_tx(&mut self) -> Result<()> {
1445 self.raw
1446 .take()
1447 .context("Cannot commit an already committed transaction")?
1448 .commit_tx()
1449 .await?;
1450 self.notifications.submit_queue(
1451 &self
1452 .notify_queue
1453 .take()
1454 .expect("commit must be called only once"),
1455 );
1456 Ok(())
1457 }
1458
1459 fn is_global(&self) -> bool {
1460 true
1461 }
1462
1463 fn global_dbtx(
1464 &mut self,
1465 _access_token: GlobalDBTxAccessToken,
1466 ) -> &mut dyn IDatabaseTransaction {
1467 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1468 }
1469}
1470
1471#[derive(Clone)]
1474struct CommitTracker {
1475 is_committed: bool,
1477 has_writes: bool,
1479 ignore_uncommitted: bool,
1481}
1482
1483impl Drop for CommitTracker {
1484 fn drop(&mut self) {
1485 if self.has_writes && !self.is_committed {
1486 if self.ignore_uncommitted {
1487 trace!(
1488 target: LOG_DB,
1489 "DatabaseTransaction has writes and has not called commit, but that's expected."
1490 );
1491 } else {
1492 warn!(
1493 target: LOG_DB,
1494 location = ?backtrace::Backtrace::new(),
1495 "DatabaseTransaction has writes and has not called commit."
1496 );
1497 }
1498 }
1499 }
1500}
1501
1502enum MaybeRef<'a, T> {
1503 Owned(T),
1504 Borrowed(&'a mut T),
1505}
1506
1507impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1508 type Target = T;
1509
1510 fn deref(&self) -> &Self::Target {
1511 match self {
1512 MaybeRef::Owned(o) => o,
1513 MaybeRef::Borrowed(r) => r,
1514 }
1515 }
1516}
1517
1518impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1519 fn deref_mut(&mut self) -> &mut Self::Target {
1520 match self {
1521 MaybeRef::Owned(o) => o,
1522 MaybeRef::Borrowed(r) => r,
1523 }
1524 }
1525}
1526
1527pub struct Committable;
1531
1532pub struct NonCommittable;
1536
1537pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1541 tx: Box<dyn IDatabaseTransaction + 'tx>,
1542 decoders: ModuleDecoderRegistry,
1543 commit_tracker: MaybeRef<'tx, CommitTracker>,
1544 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1545 capability: marker::PhantomData<Cap>,
1546}
1547
1548impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1550 f.write_fmt(format_args!(
1551 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1552 self.tx, self.decoders
1553 ))
1554 }
1555}
1556
1557impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1558 fn decoders(&self) -> &ModuleDecoderRegistry {
1559 &self.decoders
1560 }
1561}
1562
1563#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1564fn decode_value<V: DatabaseValue>(
1565 value_bytes: &[u8],
1566 decoders: &ModuleDecoderRegistry,
1567) -> Result<V, DecodingError> {
1568 trace!(
1569 bytes = %AbbreviateHexBytes(value_bytes),
1570 "decoding value",
1571 );
1572 V::from_bytes(value_bytes, decoders)
1573}
1574
1575fn decode_value_expect<V: DatabaseValue>(
1576 value_bytes: &[u8],
1577 decoders: &ModuleDecoderRegistry,
1578 key_bytes: &[u8],
1579) -> V {
1580 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1581 panic!(
1582 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1583 any::type_name::<V>(),
1584 err,
1585 AbbreviateHexBytes(key_bytes),
1586 AbbreviateHexBytes(value_bytes),
1587 )
1588 })
1589}
1590
1591fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1592 trace!(
1593 bytes = %AbbreviateHexBytes(key_bytes),
1594 "decoding key",
1595 );
1596 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1597 panic!(
1598 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1599 any::type_name::<K>(),
1600 err,
1601 AbbreviateHexBytes(key_bytes)
1602 )
1603 })
1604}
1605
1606impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1607 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1609 DatabaseTransaction {
1610 tx: self.tx,
1611 decoders: self.decoders,
1612 commit_tracker: self.commit_tracker,
1613 on_commit_hooks: self.on_commit_hooks,
1614 capability: PhantomData::<NonCommittable>,
1615 }
1616 }
1617
1618 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1620 where
1621 's: 'a,
1622 {
1623 self.to_ref().into_nc()
1624 }
1625
1626 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1628 where
1629 'tx: 'a,
1630 {
1631 DatabaseTransaction {
1632 tx: Box::new(PrefixDatabaseTransaction {
1633 inner: self.tx,
1634 global_dbtx_access_token: None,
1635 prefix,
1636 }),
1637 decoders: self.decoders,
1638 commit_tracker: self.commit_tracker,
1639 on_commit_hooks: self.on_commit_hooks,
1640 capability: self.capability,
1641 }
1642 }
1643
1644 pub fn with_prefix_module_id<'a: 'tx>(
1648 self,
1649 module_instance_id: ModuleInstanceId,
1650 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1651 where
1652 'tx: 'a,
1653 {
1654 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1655 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1656 (
1657 DatabaseTransaction {
1658 tx: Box::new(PrefixDatabaseTransaction {
1659 inner: self.tx,
1660 global_dbtx_access_token: Some(global_dbtx_access_token),
1661 prefix,
1662 }),
1663 decoders: self.decoders,
1664 commit_tracker: self.commit_tracker,
1665 on_commit_hooks: self.on_commit_hooks,
1666 capability: self.capability,
1667 },
1668 global_dbtx_access_token,
1669 )
1670 }
1671
1672 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1674 where
1675 's: 'a,
1676 {
1677 let decoders = self.decoders.clone();
1678
1679 DatabaseTransaction {
1680 tx: Box::new(&mut self.tx),
1681 decoders,
1682 commit_tracker: match self.commit_tracker {
1683 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1684 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1685 },
1686 on_commit_hooks: match self.on_commit_hooks {
1687 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1688 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1689 },
1690 capability: self.capability,
1691 }
1692 }
1693
1694 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1696 where
1697 'tx: 'a,
1698 {
1699 DatabaseTransaction {
1700 tx: Box::new(PrefixDatabaseTransaction {
1701 inner: &mut self.tx,
1702 global_dbtx_access_token: None,
1703 prefix,
1704 }),
1705 decoders: self.decoders.clone(),
1706 commit_tracker: match self.commit_tracker {
1707 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1708 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1709 },
1710 on_commit_hooks: match self.on_commit_hooks {
1711 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1712 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1713 },
1714 capability: self.capability,
1715 }
1716 }
1717
1718 pub fn to_ref_with_prefix_module_id<'a>(
1719 &'a mut self,
1720 module_instance_id: ModuleInstanceId,
1721 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1722 where
1723 'tx: 'a,
1724 {
1725 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1726 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1727 (
1728 DatabaseTransaction {
1729 tx: Box::new(PrefixDatabaseTransaction {
1730 inner: &mut self.tx,
1731 global_dbtx_access_token: Some(global_dbtx_access_token),
1732 prefix,
1733 }),
1734 decoders: self.decoders.clone(),
1735 commit_tracker: match self.commit_tracker {
1736 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1737 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1738 },
1739 on_commit_hooks: match self.on_commit_hooks {
1740 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1741 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1742 },
1743 capability: self.capability,
1744 },
1745 global_dbtx_access_token,
1746 )
1747 }
1748
1749 pub fn is_global(&self) -> bool {
1751 self.tx.is_global()
1752 }
1753
1754 pub fn ensure_global(&self) -> Result<()> {
1756 if !self.is_global() {
1757 bail!("Database instance not global");
1758 }
1759
1760 Ok(())
1761 }
1762
1763 pub fn ensure_isolated(&self) -> Result<()> {
1765 if self.is_global() {
1766 bail!("Database instance not isolated");
1767 }
1768
1769 Ok(())
1770 }
1771
1772 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1774 self.commit_tracker.ignore_uncommitted = true;
1775 self
1776 }
1777
1778 pub fn warn_uncommitted(&mut self) -> &mut Self {
1780 self.commit_tracker.ignore_uncommitted = false;
1781 self
1782 }
1783
1784 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1786 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1787 self.on_commit_hooks.push(Box::new(f));
1788 }
1789
1790 pub fn global_dbtx<'a>(
1791 &'a mut self,
1792 access_token: GlobalDBTxAccessToken,
1793 ) -> DatabaseTransaction<'a, Cap>
1794 where
1795 'tx: 'a,
1796 {
1797 let decoders = self.decoders.clone();
1798
1799 DatabaseTransaction {
1800 tx: Box::new(self.tx.global_dbtx(access_token)),
1801 decoders,
1802 commit_tracker: match self.commit_tracker {
1803 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1804 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1805 },
1806 on_commit_hooks: match self.on_commit_hooks {
1807 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1808 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1809 },
1810 capability: self.capability,
1811 }
1812 }
1813}
1814
1815#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1817pub struct GlobalDBTxAccessToken(u32);
1818
1819impl GlobalDBTxAccessToken {
1820 fn from_prefix(prefix: &[u8]) -> Self {
1831 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1832 }
1833}
1834
1835impl<'tx> DatabaseTransaction<'tx, Committable> {
1836 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1837 Self {
1838 tx: dbtx,
1839 decoders,
1840 commit_tracker: MaybeRef::Owned(CommitTracker {
1841 is_committed: false,
1842 has_writes: false,
1843 ignore_uncommitted: false,
1844 }),
1845 on_commit_hooks: MaybeRef::Owned(vec![]),
1846 capability: PhantomData,
1847 }
1848 }
1849
1850 pub async fn commit_tx_result(mut self) -> Result<()> {
1851 self.commit_tracker.is_committed = true;
1852 let commit_result = self.tx.commit_tx().await;
1853
1854 if commit_result.is_ok() {
1856 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1857 hook();
1858 }
1859 }
1860
1861 commit_result
1862 }
1863
1864 pub async fn commit_tx(mut self) {
1865 self.commit_tracker.is_committed = true;
1866 self.commit_tx_result()
1867 .await
1868 .expect("Unrecoverable error occurred while committing to the database.");
1869 }
1870}
1871
1872#[apply(async_trait_maybe_send!)]
1873impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1874where
1875 Cap: Send,
1876{
1877 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1878 self.commit_tracker.has_writes = true;
1879 self.tx.raw_insert_bytes(key, value).await
1880 }
1881
1882 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1883 self.tx.raw_get_bytes(key).await
1884 }
1885
1886 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1887 self.tx.raw_remove_entry(key).await
1888 }
1889
1890 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1891 self.tx.raw_find_by_range(key_range).await
1892 }
1893
1894 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1895 self.tx.raw_find_by_prefix(key_prefix).await
1896 }
1897
1898 async fn raw_find_by_prefix_sorted_descending(
1899 &mut self,
1900 key_prefix: &[u8],
1901 ) -> Result<PrefixStream<'_>> {
1902 self.tx
1903 .raw_find_by_prefix_sorted_descending(key_prefix)
1904 .await
1905 }
1906
1907 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1908 self.commit_tracker.has_writes = true;
1909 self.tx.raw_remove_by_prefix(key_prefix).await
1910 }
1911}
1912#[apply(async_trait_maybe_send!)]
1913impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1914 async fn set_tx_savepoint(&mut self) -> Result<()> {
1915 self.tx.set_tx_savepoint().await
1916 }
1917
1918 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1919 self.tx.rollback_tx_to_savepoint().await
1920 }
1921}
1922
1923impl<T> DatabaseKeyPrefix for T
1924where
1925 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1926{
1927 fn to_bytes(&self) -> Vec<u8> {
1928 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1929 data.append(&mut self.consensus_encode_to_vec());
1930 data
1931 }
1932}
1933
1934impl<T> DatabaseKey for T
1935where
1936 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1939{
1940 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1941 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1942 if data.is_empty() {
1943 return Err(DecodingError::wrong_length(1, 0));
1945 }
1946
1947 if data[0] != Self::DB_PREFIX {
1948 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1949 }
1950
1951 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1952 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1953 }
1954}
1955
1956impl<T> DatabaseValue for T
1957where
1958 T: Debug + Encodable + Decodable,
1959{
1960 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1961 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1962 }
1963
1964 fn to_bytes(&self) -> Vec<u8> {
1965 self.consensus_encode_to_vec()
1966 }
1967}
1968
1969#[macro_export]
2030macro_rules! impl_db_record {
2031 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => {
2032 impl $crate::db::DatabaseRecord for $key {
2033 const DB_PREFIX: u8 = $db_prefix as u8;
2034 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2035 type Key = Self;
2036 type Value = $val;
2037 }
2038 $(
2039 impl_db_record! {
2040 @impl_notify_marker key = $key, notify_on_modify = $notify
2041 }
2042 )?
2043 };
2044 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2046 impl $crate::db::DatabaseKeyWithNotify for $key {}
2047 };
2048 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2050}
2051
2052#[macro_export]
2053macro_rules! impl_db_lookup{
2054 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2055 $(
2056 impl $crate::db::DatabaseLookup for $query_prefix {
2057 type Record = $key;
2058 }
2059 )*
2060 };
2061}
2062
2063#[derive(Debug, Encodable, Decodable, Serialize)]
2065pub struct DatabaseVersionKeyV0;
2066
2067#[derive(Debug, Encodable, Decodable, Serialize)]
2068pub struct DatabaseVersionKey(pub ModuleInstanceId);
2069
2070#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2071pub struct DatabaseVersion(pub u64);
2072
2073impl_db_record!(
2074 key = DatabaseVersionKeyV0,
2075 value = DatabaseVersion,
2076 db_prefix = DbKeyPrefix::DatabaseVersion
2077);
2078
2079impl_db_record!(
2080 key = DatabaseVersionKey,
2081 value = DatabaseVersion,
2082 db_prefix = DbKeyPrefix::DatabaseVersion
2083);
2084
2085impl std::fmt::Display for DatabaseVersion {
2086 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2087 write!(f, "{}", self.0)
2088 }
2089}
2090
2091impl DatabaseVersion {
2092 pub fn increment(&self) -> Self {
2093 Self(self.0 + 1)
2094 }
2095}
2096
2097impl std::fmt::Display for DbKeyPrefix {
2098 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2099 write!(f, "{self:?}")
2100 }
2101}
2102
2103#[repr(u8)]
2104#[derive(Clone, EnumIter, Debug)]
2105pub enum DbKeyPrefix {
2106 DatabaseVersion = 0x50,
2107 ClientBackup = 0x51,
2108}
2109
2110#[derive(Debug, Error)]
2111pub enum DecodingError {
2112 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2113 WrongPrefix { expected: u8, found: u8 },
2114 #[error("Key had a wrong length, expected {expected} but got {found}")]
2115 WrongLength { expected: usize, found: usize },
2116 #[error("Other decoding error: {0:#}")]
2117 Other(anyhow::Error),
2118}
2119
2120impl DecodingError {
2121 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2122 Self::Other(anyhow::Error::from(error))
2123 }
2124
2125 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2126 Self::WrongPrefix { expected, found }
2127 }
2128
2129 pub fn wrong_length(expected: usize, found: usize) -> Self {
2130 Self::WrongLength { expected, found }
2131 }
2132}
2133
2134#[macro_export]
2135macro_rules! push_db_pair_items {
2136 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2137 let db_items =
2138 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2139 .await
2140 .map(|(key, val)| {
2141 (
2142 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2143 val,
2144 )
2145 })
2146 .collect::<BTreeMap<String, $value_type>>()
2147 .await;
2148
2149 $map.insert($key_literal.to_string(), Box::new(db_items));
2150 };
2151}
2152
2153#[macro_export]
2154macro_rules! push_db_key_items {
2155 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => {
2156 let db_items =
2157 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2158 .await
2159 .map(|(key, _)| key)
2160 .collect::<Vec<$key_type>>()
2161 .await;
2162
2163 $map.insert($key_literal.to_string(), Box::new(db_items));
2164 };
2165}
2166
2167pub type CoreMigrationFn = for<'tx> fn(
2170 MigrationContext<'tx>,
2171) -> Pin<
2172 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2173>;
2174
2175pub fn get_current_database_version<F>(
2179 migrations: &BTreeMap<DatabaseVersion, F>,
2180) -> DatabaseVersion {
2181 let versions = migrations.keys().copied().collect::<Vec<_>>();
2182
2183 if !versions
2186 .windows(2)
2187 .all(|window| window[0].increment() == window[1])
2188 {
2189 panic!("Database Migrations are not defined contiguously");
2190 }
2191
2192 versions
2193 .last()
2194 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2195}
2196
2197pub async fn apply_migrations_server(
2199 db: &Database,
2200 kind: String,
2201 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2202) -> Result<(), anyhow::Error> {
2203 let mut global_dbtx = db.begin_transaction().await;
2204 global_dbtx.ensure_global()?;
2205 apply_migrations_server_dbtx(&mut global_dbtx.to_ref_nc(), kind, migrations).await?;
2206 global_dbtx.commit_tx_result().await
2207}
2208
2209pub async fn apply_migrations_server_dbtx(
2211 global_dbtx: &mut DatabaseTransaction<'_>,
2212 kind: String,
2213 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2214) -> Result<(), anyhow::Error> {
2215 global_dbtx.ensure_global()?;
2216 apply_migrations_dbtx(global_dbtx, kind, migrations, None, None).await
2217}
2218
2219pub async fn apply_migrations(
2220 db: &Database,
2221 kind: String,
2222 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2223 module_instance_id: Option<ModuleInstanceId>,
2224 external_prefixes_above: Option<u8>,
2227) -> Result<(), anyhow::Error> {
2228 let mut dbtx = db.begin_transaction().await;
2229 apply_migrations_dbtx(
2230 &mut dbtx.to_ref_nc(),
2231 kind,
2232 migrations,
2233 module_instance_id,
2234 external_prefixes_above,
2235 )
2236 .await?;
2237
2238 dbtx.commit_tx_result().await
2239}
2240pub async fn apply_migrations_dbtx(
2252 global_dbtx: &mut DatabaseTransaction<'_>,
2253 kind: String,
2254 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2255 module_instance_id: Option<ModuleInstanceId>,
2256 external_prefixes_above: Option<u8>,
2259) -> Result<(), anyhow::Error> {
2260 let is_new_db = global_dbtx
2263 .raw_find_by_prefix(&[])
2264 .await?
2265 .filter(|(key, _v)| {
2266 std::future::ready(
2267 external_prefixes_above.map_or(true, |external_prefixes_above| {
2268 !key.is_empty() && key[0] < external_prefixes_above
2269 }),
2270 )
2271 })
2272 .next()
2273 .await
2274 .is_none();
2275
2276 let target_db_version = get_current_database_version(&migrations);
2277
2278 create_database_version_dbtx(
2280 global_dbtx,
2281 target_db_version,
2282 module_instance_id,
2283 kind.clone(),
2284 is_new_db,
2285 )
2286 .await?;
2287
2288 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2289
2290 let disk_version = global_dbtx
2291 .get_value(&DatabaseVersionKey(module_instance_id_key))
2292 .await;
2293
2294 let db_version = if let Some(disk_version) = disk_version {
2295 let mut current_db_version = disk_version;
2296
2297 if current_db_version > target_db_version {
2298 return Err(anyhow::anyhow!(format!(
2299 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2300 )));
2301 }
2302
2303 while current_db_version < target_db_version {
2304 if let Some(migration) = migrations.get(¤t_db_version) {
2305 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2306 migration(MigrationContext {
2307 dbtx: global_dbtx.to_ref_nc(),
2308 module_instance_id,
2309 })
2310 .await?;
2311 } else {
2312 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2313 }
2314
2315 current_db_version = current_db_version.increment();
2316 global_dbtx
2317 .insert_entry(
2318 &DatabaseVersionKey(module_instance_id_key),
2319 ¤t_db_version,
2320 )
2321 .await;
2322 }
2323
2324 current_db_version
2325 } else {
2326 target_db_version
2327 };
2328
2329 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2330 Ok(())
2331}
2332
2333pub async fn create_database_version(
2334 db: &Database,
2335 target_db_version: DatabaseVersion,
2336 module_instance_id: Option<ModuleInstanceId>,
2337 kind: String,
2338 is_new_db: bool,
2339) -> Result<(), anyhow::Error> {
2340 let mut dbtx = db.begin_transaction().await;
2341
2342 create_database_version_dbtx(
2343 &mut dbtx.to_ref_nc(),
2344 target_db_version,
2345 module_instance_id,
2346 kind,
2347 is_new_db,
2348 )
2349 .await?;
2350
2351 dbtx.commit_tx_result().await?;
2352 Ok(())
2353}
2354
2355pub async fn create_database_version_dbtx(
2359 global_dbtx: &mut DatabaseTransaction<'_>,
2360 target_db_version: DatabaseVersion,
2361 module_instance_id: Option<ModuleInstanceId>,
2362 kind: String,
2363 is_new_db: bool,
2364) -> Result<(), anyhow::Error> {
2365 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2366
2367 if global_dbtx
2371 .get_value(&DatabaseVersionKey(key_module_instance_id))
2372 .await
2373 .is_none()
2374 {
2375 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2384 remove_current_db_version_if_exists(
2385 &mut global_dbtx
2386 .to_ref_with_prefix_module_id(module_instance_id)
2387 .0
2388 .into_nc(),
2389 is_new_db,
2390 target_db_version,
2391 )
2392 .await
2393 } else {
2394 remove_current_db_version_if_exists(
2395 &mut global_dbtx.to_ref().into_nc(),
2396 is_new_db,
2397 target_db_version,
2398 )
2399 .await
2400 };
2401
2402 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2404 global_dbtx
2405 .insert_new_entry(
2406 &DatabaseVersionKey(key_module_instance_id),
2407 ¤t_version_in_module,
2408 )
2409 .await;
2410 }
2411
2412 Ok(())
2413}
2414
2415async fn remove_current_db_version_if_exists(
2420 version_dbtx: &mut DatabaseTransaction<'_>,
2421 is_new_db: bool,
2422 target_db_version: DatabaseVersion,
2423) -> DatabaseVersion {
2424 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2428 match current_version_in_module {
2429 Some(database_version) => database_version,
2430 None if is_new_db => target_db_version,
2431 None => DatabaseVersion(0),
2432 }
2433}
2434
2435fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2438 module_instance_id.map_or_else(
2440 || MODULE_GLOBAL_PREFIX.into(),
2441 |module_instance_id| module_instance_id,
2442 )
2443}
2444
2445pub struct MigrationContext<'tx> {
2446 dbtx: DatabaseTransaction<'tx>,
2447 module_instance_id: Option<ModuleInstanceId>,
2448}
2449
2450impl<'tx> MigrationContext<'tx> {
2451 pub fn dbtx(&mut self) -> DatabaseTransaction {
2452 if let Some(module_instance_id) = self.module_instance_id {
2453 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2454 } else {
2455 self.dbtx.to_ref_nc()
2456 }
2457 }
2458
2459 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2460 self.module_instance_id
2461 }
2462
2463 #[doc(hidden)]
2464 pub fn __global_dbtx(&mut self) -> &mut DatabaseTransaction<'tx> {
2465 &mut self.dbtx
2466 }
2467}
2468
2469#[allow(unused_imports)]
2470mod test_utils {
2471 use std::collections::BTreeMap;
2472 use std::time::Duration;
2473
2474 use fedimint_core::db::MigrationContext;
2475 use futures::future::ready;
2476 use futures::{Future, FutureExt, StreamExt};
2477 use rand::Rng;
2478 use tokio::join;
2479
2480 use super::{
2481 apply_migrations, CoreMigrationFn, Database, DatabaseTransaction, DatabaseVersion,
2482 DatabaseVersionKey, DatabaseVersionKeyV0,
2483 };
2484 use crate::core::ModuleKind;
2485 use crate::db::mem_impl::MemDatabase;
2486 use crate::db::{
2487 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2488 };
2489 use crate::encoding::{Decodable, Encodable};
2490 use crate::module::registry::ModuleDecoderRegistry;
2491
2492 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2493 crate::runtime::timeout(Duration::from_millis(10), fut)
2494 .await
2495 .ok()
2496 }
2497
2498 #[repr(u8)]
2499 #[derive(Clone)]
2500 pub enum TestDbKeyPrefix {
2501 Test = 0x42,
2502 AltTest = 0x43,
2503 PercentTestKey = 0x25,
2504 }
2505
2506 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2507 pub(super) struct TestKey(pub u64);
2508
2509 #[derive(Debug, Encodable, Decodable)]
2510 struct DbPrefixTestPrefix;
2511
2512 impl_db_record!(
2513 key = TestKey,
2514 value = TestVal,
2515 db_prefix = TestDbKeyPrefix::Test,
2516 notify_on_modify = true,
2517 );
2518 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2519
2520 #[derive(Debug, Encodable, Decodable)]
2521 struct TestKeyV0(u64, u64);
2522
2523 #[derive(Debug, Encodable, Decodable)]
2524 struct DbPrefixTestPrefixV0;
2525
2526 impl_db_record!(
2527 key = TestKeyV0,
2528 value = TestVal,
2529 db_prefix = TestDbKeyPrefix::Test,
2530 );
2531 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2532
2533 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2534 struct AltTestKey(u64);
2535
2536 #[derive(Debug, Encodable, Decodable)]
2537 struct AltDbPrefixTestPrefix;
2538
2539 impl_db_record!(
2540 key = AltTestKey,
2541 value = TestVal,
2542 db_prefix = TestDbKeyPrefix::AltTest,
2543 );
2544 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2545
2546 #[derive(Debug, Encodable, Decodable)]
2547 struct PercentTestKey(u64);
2548
2549 #[derive(Debug, Encodable, Decodable)]
2550 struct PercentPrefixTestPrefix;
2551
2552 impl_db_record!(
2553 key = PercentTestKey,
2554 value = TestVal,
2555 db_prefix = TestDbKeyPrefix::PercentTestKey,
2556 );
2557
2558 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2559 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2560 pub(super) struct TestVal(pub u64);
2561
2562 const TEST_MODULE_PREFIX: u16 = 1;
2563 const ALT_MODULE_PREFIX: u16 = 2;
2564
2565 pub async fn verify_insert_elements(db: Database) {
2566 let mut dbtx = db.begin_transaction().await;
2567 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2568 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2569 dbtx.commit_tx().await;
2570
2571 let mut dbtx = db.begin_transaction().await;
2573 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2574 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2575 dbtx.commit_tx().await;
2576
2577 let mut dbtx = db.begin_transaction().await;
2579 assert_eq!(
2580 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2581 Some(TestVal(2))
2582 );
2583 assert_eq!(
2584 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2585 Some(TestVal(3))
2586 );
2587 dbtx.commit_tx().await;
2588
2589 let mut dbtx = db.begin_transaction().await;
2590 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2591 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2592 dbtx.commit_tx().await;
2593 }
2594
2595 pub async fn verify_remove_nonexisting(db: Database) {
2596 let mut dbtx = db.begin_transaction().await;
2597 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2598 let removed = dbtx.remove_entry(&TestKey(1)).await;
2599 assert!(removed.is_none());
2600
2601 dbtx.commit_tx().await;
2603 }
2604
2605 pub async fn verify_remove_existing(db: Database) {
2606 let mut dbtx = db.begin_transaction().await;
2607
2608 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2609
2610 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2611
2612 let removed = dbtx.remove_entry(&TestKey(1)).await;
2613 assert_eq!(removed, Some(TestVal(2)));
2614 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2615
2616 dbtx.commit_tx().await;
2618 }
2619
2620 pub async fn verify_read_own_writes(db: Database) {
2621 let mut dbtx = db.begin_transaction().await;
2622
2623 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2624
2625 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2626
2627 dbtx.commit_tx().await;
2629 }
2630
2631 pub async fn verify_prevent_dirty_reads(db: Database) {
2632 let mut dbtx = db.begin_transaction().await;
2633
2634 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2635
2636 let mut dbtx2 = db.begin_transaction().await;
2638 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2639
2640 dbtx.commit_tx().await;
2642 }
2643
2644 pub async fn verify_find_by_range(db: Database) {
2645 let mut dbtx = db.begin_transaction().await;
2646 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2647 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2648 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2649
2650 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2651 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2652
2653 {
2654 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2655 module_dbtx
2656 .insert_entry(&TestKey(300), &TestVal(3000))
2657 .await;
2658 }
2659
2660 dbtx.commit_tx().await;
2661
2662 let mut dbtx = db.begin_transaction_nc().await;
2664
2665 let returned_keys = dbtx
2666 .find_by_range(TestKey(55)..TestKey(56))
2667 .await
2668 .collect::<Vec<_>>()
2669 .await;
2670
2671 let expected = vec![(TestKey(55), TestVal(9999))];
2672
2673 assert_eq!(returned_keys, expected);
2674
2675 let returned_keys = dbtx
2676 .find_by_range(TestKey(54)..TestKey(56))
2677 .await
2678 .collect::<Vec<_>>()
2679 .await;
2680
2681 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2682 assert_eq!(returned_keys, expected);
2683
2684 let returned_keys = dbtx
2685 .find_by_range(TestKey(54)..TestKey(57))
2686 .await
2687 .collect::<Vec<_>>()
2688 .await;
2689
2690 let expected = vec![
2691 (TestKey(54), TestVal(8888)),
2692 (TestKey(55), TestVal(9999)),
2693 (TestKey(56), TestVal(7777)),
2694 ];
2695 assert_eq!(returned_keys, expected);
2696
2697 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2698 let test_range = module_dbtx
2699 .find_by_range(TestKey(300)..TestKey(301))
2700 .await
2701 .collect::<Vec<_>>()
2702 .await;
2703 assert!(test_range.len() == 1);
2704 }
2705
2706 pub async fn verify_find_by_prefix(db: Database) {
2707 let mut dbtx = db.begin_transaction().await;
2708 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2709 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2710
2711 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2712 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2713 dbtx.commit_tx().await;
2714
2715 let mut dbtx = db.begin_transaction().await;
2717
2718 let returned_keys = dbtx
2719 .find_by_prefix(&DbPrefixTestPrefix)
2720 .await
2721 .collect::<Vec<_>>()
2722 .await;
2723
2724 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2725 assert_eq!(returned_keys, expected);
2726
2727 let reversed = dbtx
2728 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2729 .await
2730 .collect::<Vec<_>>()
2731 .await;
2732 let mut reversed_expected = expected;
2733 reversed_expected.reverse();
2734 assert_eq!(reversed, reversed_expected);
2735
2736 let returned_keys = dbtx
2737 .find_by_prefix(&AltDbPrefixTestPrefix)
2738 .await
2739 .collect::<Vec<_>>()
2740 .await;
2741
2742 let expected = vec![
2743 (AltTestKey(54), TestVal(6666)),
2744 (AltTestKey(55), TestVal(7777)),
2745 ];
2746 assert_eq!(returned_keys, expected);
2747
2748 let reversed = dbtx
2749 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2750 .await
2751 .collect::<Vec<_>>()
2752 .await;
2753 let mut reversed_expected = expected;
2754 reversed_expected.reverse();
2755 assert_eq!(reversed, reversed_expected);
2756 }
2757
2758 pub async fn verify_commit(db: Database) {
2759 let mut dbtx = db.begin_transaction().await;
2760
2761 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2762 dbtx.commit_tx().await;
2763
2764 let mut dbtx2 = db.begin_transaction().await;
2766 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2767 }
2768
2769 pub async fn verify_rollback_to_savepoint(db: Database) {
2770 let mut dbtx_rollback = db.begin_transaction().await;
2771
2772 dbtx_rollback
2773 .insert_entry(&TestKey(20), &TestVal(2000))
2774 .await;
2775
2776 dbtx_rollback
2777 .set_tx_savepoint()
2778 .await
2779 .expect("Error setting transaction savepoint");
2780
2781 dbtx_rollback
2782 .insert_entry(&TestKey(21), &TestVal(2001))
2783 .await;
2784
2785 assert_eq!(
2786 dbtx_rollback.get_value(&TestKey(20)).await,
2787 Some(TestVal(2000))
2788 );
2789 assert_eq!(
2790 dbtx_rollback.get_value(&TestKey(21)).await,
2791 Some(TestVal(2001))
2792 );
2793
2794 dbtx_rollback
2795 .rollback_tx_to_savepoint()
2796 .await
2797 .expect("Error setting transaction savepoint");
2798
2799 assert_eq!(
2800 dbtx_rollback.get_value(&TestKey(20)).await,
2801 Some(TestVal(2000))
2802 );
2803
2804 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2805
2806 dbtx_rollback.commit_tx().await;
2808 }
2809
2810 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2811 let mut dbtx = db.begin_transaction().await;
2812 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2813
2814 let mut dbtx2 = db.begin_transaction().await;
2815
2816 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2817
2818 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2819
2820 dbtx2.commit_tx().await;
2821
2822 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2825
2826 let expected_keys = 0;
2827 let returned_keys = dbtx
2828 .find_by_prefix(&DbPrefixTestPrefix)
2829 .await
2830 .fold(0, |returned_keys, (key, value)| async move {
2831 if key == TestKey(100) {
2832 assert!(value.eq(&TestVal(101)));
2833 }
2834 returned_keys + 1
2835 })
2836 .await;
2837
2838 assert_eq!(returned_keys, expected_keys);
2839 }
2840
2841 pub async fn verify_snapshot_isolation(db: Database) {
2842 async fn random_yield() {
2843 let times = if rand::thread_rng().gen_bool(0.5) {
2844 0
2845 } else {
2846 10
2847 };
2848 for _ in 0..times {
2849 tokio::task::yield_now().await;
2850 }
2851 }
2852
2853 for i in 0..1000 {
2855 let base_key = i * 2;
2856 let tx_accepted_key = base_key;
2857 let spent_input_key = base_key + 1;
2858
2859 join!(
2860 async {
2861 random_yield().await;
2862 let mut dbtx = db.begin_transaction().await;
2863
2864 random_yield().await;
2865 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2866 random_yield().await;
2867 let s = match i % 5 {
2870 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2871 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2872 2 => {
2873 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2874 .await
2875 }
2876 3 => {
2877 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2878 .await
2879 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2880 .map(|(_k, v)| v)
2881 .next()
2882 .await
2883 }
2884 4 => {
2885 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2886 .await
2887 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2888 .map(|(_k, v)| v)
2889 .next()
2890 .await
2891 }
2892 _ => {
2893 panic!("woot?");
2894 }
2895 };
2896
2897 match (a, s) {
2898 (None, None) | (Some(_), Some(_)) => {}
2899 (None, Some(_)) => panic!("none some?! {i}"),
2900 (Some(_), None) => panic!("some none?! {i}"),
2901 }
2902 },
2903 async {
2904 random_yield().await;
2905
2906 let mut dbtx = db.begin_transaction().await;
2907 random_yield().await;
2908 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2909
2910 random_yield().await;
2911 assert_eq!(
2912 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2913 .await,
2914 None
2915 );
2916
2917 random_yield().await;
2918 assert_eq!(
2919 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2920 .await,
2921 None
2922 );
2923 random_yield().await;
2924 dbtx.commit_tx().await;
2925 }
2926 );
2927 }
2928 }
2929
2930 pub async fn verify_phantom_entry(db: Database) {
2931 let mut dbtx = db.begin_transaction().await;
2932
2933 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2934
2935 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2936
2937 dbtx.commit_tx().await;
2938
2939 let mut dbtx = db.begin_transaction().await;
2940 let expected_keys = 2;
2941 let returned_keys = dbtx
2942 .find_by_prefix(&DbPrefixTestPrefix)
2943 .await
2944 .fold(0, |returned_keys, (key, value)| async move {
2945 match key {
2946 TestKey(100) => {
2947 assert!(value.eq(&TestVal(101)));
2948 }
2949 TestKey(101) => {
2950 assert!(value.eq(&TestVal(102)));
2951 }
2952 _ => {}
2953 };
2954 returned_keys + 1
2955 })
2956 .await;
2957
2958 assert_eq!(returned_keys, expected_keys);
2959
2960 let mut dbtx2 = db.begin_transaction().await;
2961
2962 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2963
2964 dbtx2.commit_tx().await;
2965
2966 let returned_keys = dbtx
2967 .find_by_prefix(&DbPrefixTestPrefix)
2968 .await
2969 .fold(0, |returned_keys, (key, value)| async move {
2970 match key {
2971 TestKey(100) => {
2972 assert!(value.eq(&TestVal(101)));
2973 }
2974 TestKey(101) => {
2975 assert!(value.eq(&TestVal(102)));
2976 }
2977 _ => {}
2978 };
2979 returned_keys + 1
2980 })
2981 .await;
2982
2983 assert_eq!(returned_keys, expected_keys);
2984 }
2985
2986 pub async fn expect_write_conflict(db: Database) {
2987 let mut dbtx = db.begin_transaction().await;
2988 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2989 dbtx.commit_tx().await;
2990
2991 let mut dbtx2 = db.begin_transaction().await;
2992 let mut dbtx3 = db.begin_transaction().await;
2993
2994 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2995
2996 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3000
3001 dbtx2.commit_tx().await;
3002 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3003 }
3004
3005 pub async fn verify_string_prefix(db: Database) {
3006 let mut dbtx = db.begin_transaction().await;
3007 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3008
3009 assert_eq!(
3010 dbtx.get_value(&PercentTestKey(100)).await,
3011 Some(TestVal(101))
3012 );
3013
3014 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3015
3016 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3017
3018 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3019
3020 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3023
3024 let expected_keys = 4;
3025 let returned_keys = dbtx
3026 .find_by_prefix(&PercentPrefixTestPrefix)
3027 .await
3028 .fold(0, |returned_keys, (key, value)| async move {
3029 if matches!(key, PercentTestKey(101)) {
3030 assert!(value.eq(&TestVal(100)));
3031 }
3032 returned_keys + 1
3033 })
3034 .await;
3035
3036 assert_eq!(returned_keys, expected_keys);
3037 }
3038
3039 pub async fn verify_remove_by_prefix(db: Database) {
3040 let mut dbtx = db.begin_transaction().await;
3041
3042 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3043
3044 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3045
3046 dbtx.commit_tx().await;
3047
3048 let mut remove_dbtx = db.begin_transaction().await;
3049 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3050 remove_dbtx.commit_tx().await;
3051
3052 let mut dbtx = db.begin_transaction().await;
3053 let expected_keys = 0;
3054 let returned_keys = dbtx
3055 .find_by_prefix(&DbPrefixTestPrefix)
3056 .await
3057 .fold(0, |returned_keys, (key, value)| async move {
3058 match key {
3059 TestKey(100) => {
3060 assert!(value.eq(&TestVal(101)));
3061 }
3062 TestKey(101) => {
3063 assert!(value.eq(&TestVal(102)));
3064 }
3065 _ => {}
3066 };
3067 returned_keys + 1
3068 })
3069 .await;
3070
3071 assert_eq!(returned_keys, expected_keys);
3072 }
3073
3074 pub async fn verify_module_db(db: Database, module_db: Database) {
3075 let mut dbtx = db.begin_transaction().await;
3076
3077 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3078
3079 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3080
3081 dbtx.commit_tx().await;
3082
3083 let mut module_dbtx = module_db.begin_transaction().await;
3085 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3086
3087 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3088
3089 let mut dbtx = db.begin_transaction().await;
3091 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3092
3093 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3094
3095 let mut module_dbtx = module_db.begin_transaction().await;
3096
3097 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3098
3099 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3100
3101 module_dbtx.commit_tx().await;
3102
3103 let expected_keys = 2;
3104 let mut dbtx = db.begin_transaction().await;
3105 let returned_keys = dbtx
3106 .find_by_prefix(&DbPrefixTestPrefix)
3107 .await
3108 .fold(0, |returned_keys, (key, value)| async move {
3109 match key {
3110 TestKey(100) => {
3111 assert!(value.eq(&TestVal(101)));
3112 }
3113 TestKey(101) => {
3114 assert!(value.eq(&TestVal(102)));
3115 }
3116 _ => {}
3117 };
3118 returned_keys + 1
3119 })
3120 .await;
3121
3122 assert_eq!(returned_keys, expected_keys);
3123
3124 let removed = dbtx.remove_entry(&TestKey(100)).await;
3125 assert_eq!(removed, Some(TestVal(101)));
3126 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3127
3128 let mut module_dbtx = module_db.begin_transaction().await;
3129 assert_eq!(
3130 module_dbtx.get_value(&TestKey(100)).await,
3131 Some(TestVal(103))
3132 );
3133 }
3134
3135 pub async fn verify_module_prefix(db: Database) {
3136 let mut test_dbtx = db.begin_transaction().await;
3137 {
3138 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3139
3140 test_module_dbtx
3141 .insert_entry(&TestKey(100), &TestVal(101))
3142 .await;
3143
3144 test_module_dbtx
3145 .insert_entry(&TestKey(101), &TestVal(102))
3146 .await;
3147 }
3148
3149 test_dbtx.commit_tx().await;
3150
3151 let mut alt_dbtx = db.begin_transaction().await;
3152 {
3153 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3154
3155 alt_module_dbtx
3156 .insert_entry(&TestKey(100), &TestVal(103))
3157 .await;
3158
3159 alt_module_dbtx
3160 .insert_entry(&TestKey(101), &TestVal(104))
3161 .await;
3162 }
3163
3164 alt_dbtx.commit_tx().await;
3165
3166 let mut test_dbtx = db.begin_transaction().await;
3168 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3169 assert_eq!(
3170 test_module_dbtx.get_value(&TestKey(100)).await,
3171 Some(TestVal(101))
3172 );
3173
3174 assert_eq!(
3175 test_module_dbtx.get_value(&TestKey(101)).await,
3176 Some(TestVal(102))
3177 );
3178
3179 let expected_keys = 2;
3180 let returned_keys = test_module_dbtx
3181 .find_by_prefix(&DbPrefixTestPrefix)
3182 .await
3183 .fold(0, |returned_keys, (key, value)| async move {
3184 match key {
3185 TestKey(100) => {
3186 assert!(value.eq(&TestVal(101)));
3187 }
3188 TestKey(101) => {
3189 assert!(value.eq(&TestVal(102)));
3190 }
3191 _ => {}
3192 };
3193 returned_keys + 1
3194 })
3195 .await;
3196
3197 assert_eq!(returned_keys, expected_keys);
3198
3199 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3200 assert_eq!(removed, Some(TestVal(101)));
3201 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3202
3203 let mut test_dbtx = db.begin_transaction().await;
3206 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3207
3208 test_dbtx.commit_tx().await;
3209 }
3210
3211 #[cfg(test)]
3212 #[tokio::test]
3213 pub async fn verify_test_migration() {
3214 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3216 let expected_test_keys_size: usize = 100;
3217 let mut dbtx = db.begin_transaction().await;
3218 for i in 0..expected_test_keys_size {
3219 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3220 .await;
3221 }
3222
3223 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3225 .await;
3226 dbtx.commit_tx().await;
3227
3228 let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
3229
3230 migrations.insert(DatabaseVersion(0), |ctx| {
3231 migrate_test_db_version_0(ctx).boxed()
3232 });
3233
3234 apply_migrations(&db, "TestModule".to_string(), migrations, None, None)
3235 .await
3236 .expect("Error applying migrations for TestModule");
3237
3238 let mut dbtx = db.begin_transaction().await;
3240
3241 assert!(dbtx
3244 .get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3245 .await
3246 .is_some());
3247
3248 let test_keys = dbtx
3250 .find_by_prefix(&DbPrefixTestPrefix)
3251 .await
3252 .collect::<Vec<_>>()
3253 .await;
3254 let test_keys_size = test_keys.len();
3255 assert_eq!(test_keys_size, expected_test_keys_size);
3256 for (key, val) in test_keys {
3257 assert_eq!(key.0, val.0 + 1);
3258 }
3259 }
3260
3261 #[allow(dead_code)]
3262 async fn migrate_test_db_version_0(mut ctx: MigrationContext<'_>) -> Result<(), anyhow::Error> {
3263 let mut dbtx = ctx.dbtx();
3264 let example_keys_v0 = dbtx
3265 .find_by_prefix(&DbPrefixTestPrefixV0)
3266 .await
3267 .collect::<Vec<_>>()
3268 .await;
3269 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3270 for (key, val) in example_keys_v0 {
3271 let key_v2 = TestKey(key.1);
3272 dbtx.insert_new_entry(&key_v2, &val).await;
3273 }
3274 Ok(())
3275 }
3276
3277 #[cfg(test)]
3278 #[tokio::test]
3279 async fn test_autocommit() {
3280 use std::marker::PhantomData;
3281 use std::ops::Range;
3282 use std::path::Path;
3283
3284 use anyhow::anyhow;
3285 use async_trait::async_trait;
3286
3287 use crate::db::{
3288 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3289 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3290 IRawDatabaseTransaction,
3291 };
3292 use crate::ModuleDecoderRegistry;
3293
3294 #[derive(Debug)]
3295 struct FakeDatabase;
3296
3297 #[async_trait]
3298 impl IRawDatabase for FakeDatabase {
3299 type Transaction<'a> = FakeTransaction<'a>;
3300 async fn begin_transaction(&self) -> FakeTransaction {
3301 FakeTransaction(PhantomData)
3302 }
3303
3304 fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3305 Ok(())
3306 }
3307 }
3308
3309 #[derive(Debug)]
3310 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3311
3312 #[async_trait]
3313 impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3314 async fn raw_insert_bytes(
3315 &mut self,
3316 _key: &[u8],
3317 _value: &[u8],
3318 ) -> anyhow::Result<Option<Vec<u8>>> {
3319 unimplemented!()
3320 }
3321
3322 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3323 unimplemented!()
3324 }
3325
3326 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3327 unimplemented!()
3328 }
3329
3330 async fn raw_find_by_range(
3331 &mut self,
3332 _key_range: Range<&[u8]>,
3333 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3334 unimplemented!()
3335 }
3336
3337 async fn raw_find_by_prefix(
3338 &mut self,
3339 _key_prefix: &[u8],
3340 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3341 unimplemented!()
3342 }
3343
3344 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3345 unimplemented!()
3346 }
3347
3348 async fn raw_find_by_prefix_sorted_descending(
3349 &mut self,
3350 _key_prefix: &[u8],
3351 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3352 unimplemented!()
3353 }
3354 }
3355
3356 #[async_trait]
3357 impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3358 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3359 unimplemented!()
3360 }
3361
3362 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3363 unimplemented!()
3364 }
3365 }
3366
3367 #[async_trait]
3368 impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3369 async fn commit_tx(self) -> anyhow::Result<()> {
3370 Err(anyhow!("Can't commit!"))
3371 }
3372 }
3373
3374 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3375 let err = db
3376 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3377 .await
3378 .unwrap_err();
3379
3380 match err {
3381 AutocommitError::CommitFailed {
3382 attempts: failed_attempts,
3383 ..
3384 } => {
3385 assert_eq!(failed_attempts, 5);
3386 }
3387 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3388 }
3389 }
3390}
3391
3392pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3393 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3394 decoders: ModuleDecoderRegistry,
3395 key_prefix: &KP,
3396) -> impl Stream<
3397 Item = (
3398 KP::Record,
3399 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3400 ),
3401> + 'r
3402where
3403 'inner: 'r,
3404 KP: DatabaseLookup,
3405 KP::Record: DatabaseKey,
3406{
3407 debug!(target: LOG_DB, "find by prefix sorted descending");
3408 let prefix_bytes = key_prefix.to_bytes();
3409 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3410 .await
3411 .expect("Error doing prefix search in database")
3412 .map(move |(key_bytes, value_bytes)| {
3413 let key = decode_key_expect(&key_bytes, &decoders);
3414 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3415 (key, value)
3416 })
3417}
3418
3419#[cfg(test)]
3420mod tests {
3421 use tokio::sync::oneshot;
3422
3423 use super::mem_impl::MemDatabase;
3424 use super::*;
3425 use crate::runtime::spawn;
3426
3427 async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> {
3428 let db = db.clone();
3429 let (tx, rx) = oneshot::channel::<()>();
3430 let join_handle = spawn("wait key exists", async move {
3431 let sub = db.wait_key_exists(&key);
3432 tx.send(()).unwrap();
3433 sub.await
3434 });
3435 rx.await.unwrap();
3436 join_handle
3437 }
3438
3439 #[tokio::test]
3440 async fn test_wait_key_before_transaction() {
3441 let key = TestKey(1);
3442 let val = TestVal(2);
3443 let db = MemDatabase::new().into_database();
3444
3445 let key_task = waiter(&db, TestKey(1)).await;
3446
3447 let mut tx = db.begin_transaction().await;
3448 tx.insert_new_entry(&key, &val).await;
3449 tx.commit_tx().await;
3450
3451 assert_eq!(
3452 future_returns_shortly(async { key_task.await.unwrap() }).await,
3453 Some(TestVal(2)),
3454 "should notify"
3455 );
3456 }
3457
3458 #[tokio::test]
3459 async fn test_wait_key_before_insert() {
3460 let key = TestKey(1);
3461 let val = TestVal(2);
3462 let db = MemDatabase::new().into_database();
3463
3464 let mut tx = db.begin_transaction().await;
3465 let key_task = waiter(&db, TestKey(1)).await;
3466 tx.insert_new_entry(&key, &val).await;
3467 tx.commit_tx().await;
3468
3469 assert_eq!(
3470 future_returns_shortly(async { key_task.await.unwrap() }).await,
3471 Some(TestVal(2)),
3472 "should notify"
3473 );
3474 }
3475
3476 #[tokio::test]
3477 async fn test_wait_key_after_insert() {
3478 let key = TestKey(1);
3479 let val = TestVal(2);
3480 let db = MemDatabase::new().into_database();
3481
3482 let mut tx = db.begin_transaction().await;
3483 tx.insert_new_entry(&key, &val).await;
3484
3485 let key_task = waiter(&db, TestKey(1)).await;
3486
3487 tx.commit_tx().await;
3488
3489 assert_eq!(
3490 future_returns_shortly(async { key_task.await.unwrap() }).await,
3491 Some(TestVal(2)),
3492 "should notify"
3493 );
3494 }
3495
3496 #[tokio::test]
3497 async fn test_wait_key_after_commit() {
3498 let key = TestKey(1);
3499 let val = TestVal(2);
3500 let db = MemDatabase::new().into_database();
3501
3502 let mut tx = db.begin_transaction().await;
3503 tx.insert_new_entry(&key, &val).await;
3504 tx.commit_tx().await;
3505
3506 let key_task = waiter(&db, TestKey(1)).await;
3507 assert_eq!(
3508 future_returns_shortly(async { key_task.await.unwrap() }).await,
3509 Some(TestVal(2)),
3510 "should notify"
3511 );
3512 }
3513
3514 #[tokio::test]
3515 async fn test_wait_key_isolated_db() {
3516 let module_instance_id = 10;
3517 let key = TestKey(1);
3518 let val = TestVal(2);
3519 let db = MemDatabase::new().into_database();
3520 let db = db.with_prefix_module_id(module_instance_id).0;
3521
3522 let key_task = waiter(&db, TestKey(1)).await;
3523
3524 let mut tx = db.begin_transaction().await;
3525 tx.insert_new_entry(&key, &val).await;
3526 tx.commit_tx().await;
3527
3528 assert_eq!(
3529 future_returns_shortly(async { key_task.await.unwrap() }).await,
3530 Some(TestVal(2)),
3531 "should notify"
3532 );
3533 }
3534
3535 #[tokio::test]
3536 async fn test_wait_key_isolated_tx() {
3537 let module_instance_id = 10;
3538 let key = TestKey(1);
3539 let val = TestVal(2);
3540 let db = MemDatabase::new().into_database();
3541
3542 let key_task = waiter(&db.with_prefix_module_id(module_instance_id).0, TestKey(1)).await;
3543
3544 let mut tx = db.begin_transaction().await;
3545 let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id).0;
3546 tx_mod.insert_new_entry(&key, &val).await;
3547 drop(tx_mod);
3548 tx.commit_tx().await;
3549
3550 assert_eq!(
3551 future_returns_shortly(async { key_task.await.unwrap() }).await,
3552 Some(TestVal(2)),
3553 "should notify"
3554 );
3555 }
3556
3557 #[tokio::test]
3558 async fn test_wait_key_no_transaction() {
3559 let db = MemDatabase::new().into_database();
3560
3561 let key_task = waiter(&db, TestKey(1)).await;
3562 assert_eq!(
3563 future_returns_shortly(async { key_task.await.unwrap() }).await,
3564 None,
3565 "should not notify"
3566 );
3567 }
3568
3569 #[tokio::test]
3570 async fn test_prefix_global_dbtx() {
3571 let module_instance_id = 10;
3572 let db = MemDatabase::new().into_database();
3573
3574 {
3575 let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3577
3578 let mut tx = db.begin_transaction().await;
3579 let mut tx = tx.global_dbtx(access_token);
3580 tx.insert_new_entry(&TestKey(1), &TestVal(1)).await;
3581 tx.commit_tx().await;
3582 }
3583
3584 assert_eq!(
3585 db.begin_transaction_nc().await.get_value(&TestKey(1)).await,
3586 Some(TestVal(1))
3587 );
3588
3589 {
3590 let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3592
3593 let db = db.with_prefix(vec![3, 4]);
3594
3595 let mut tx = db.begin_transaction().await;
3596 let mut tx = tx.global_dbtx(access_token);
3597 tx.insert_new_entry(&TestKey(2), &TestVal(2)).await;
3598 tx.commit_tx().await;
3599 }
3600
3601 assert_eq!(
3602 db.begin_transaction_nc().await.get_value(&TestKey(2)).await,
3603 Some(TestVal(2))
3604 );
3605 }
3606
3607 #[tokio::test]
3608 #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3609 async fn test_prefix_global_dbtx_panics_on_global_db() {
3610 let db = MemDatabase::new().into_database();
3611
3612 let mut tx = db.begin_transaction().await;
3613 let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3614 }
3615
3616 #[tokio::test]
3617 #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3618 async fn test_prefix_global_dbtx_panics_on_non_module_prefix() {
3619 let db = MemDatabase::new().into_database();
3620
3621 let prefix = vec![3, 4];
3622 let db = db.with_prefix(prefix.clone());
3623
3624 let mut tx = db.begin_transaction().await;
3625 let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&prefix));
3626 }
3627
3628 #[tokio::test]
3629 #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3630 async fn test_prefix_global_dbtx_panics_on_wrong_access_token() {
3631 let db = MemDatabase::new().into_database();
3632
3633 let prefix = vec![3, 4];
3634 let db = db.with_prefix(prefix.clone());
3635
3636 let mut tx = db.begin_transaction().await;
3637 let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3638 }
3639}