1use std::any;
106use std::collections::BTreeMap;
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{bail, Context, Result};
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use futures::{Stream, StreamExt};
120use macro_rules_attribute::apply;
121use rand::Rng;
122use serde::Serialize;
123use strum_macros::EnumIter;
124use thiserror::Error;
125use tracing::{debug, error, info, instrument, trace, warn};
126
127use crate::core::ModuleInstanceId;
128use crate::encoding::{Decodable, Encodable};
129use crate::fmt_utils::AbbreviateHexBytes;
130use crate::task::{MaybeSend, MaybeSync};
131use crate::{async_trait_maybe_send, maybe_add_send, timing};
132
133pub mod mem_impl;
134pub mod notifications;
135
136pub use test_utils::*;
137
138use self::notifications::{Notifications, NotifyQueue};
139use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
140
141pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
142
143pub trait DatabaseKeyPrefix: Debug {
144 fn to_bytes(&self) -> Vec<u8>;
145}
146
147pub trait DatabaseRecord: DatabaseKeyPrefix {
150 const DB_PREFIX: u8;
151 const NOTIFY_ON_MODIFY: bool = false;
152 type Key: DatabaseKey + Debug;
153 type Value: DatabaseValue + Debug;
154}
155
156pub trait DatabaseLookup: DatabaseKeyPrefix {
159 type Record: DatabaseRecord;
160}
161
162impl<Record> DatabaseLookup for Record
164where
165 Record: DatabaseRecord + Debug + Decodable + Encodable,
166{
167 type Record = Record;
168}
169
170pub trait DatabaseKey: Sized {
173 const NOTIFY_ON_MODIFY: bool = false;
181 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
182}
183
184pub trait DatabaseKeyWithNotify {}
186
187pub trait DatabaseValue: Sized + Debug {
189 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
190 fn to_bytes(&self) -> Vec<u8>;
191}
192
193pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
194
195pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
199
200#[derive(Debug, Error)]
202pub enum AutocommitError<E> {
203 #[error("Commit Failed: {last_error}")]
205 CommitFailed {
206 attempts: usize,
208 last_error: anyhow::Error,
210 },
211 #[error("Closure error: {error}")]
214 ClosureError {
215 attempts: usize,
221 error: E,
223 },
224}
225
226#[apply(async_trait_maybe_send!)]
235pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
236 type Transaction<'a>: IRawDatabaseTransaction + Debug;
238
239 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
241
242 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
244}
245
246#[apply(async_trait_maybe_send!)]
247impl<T> IRawDatabase for Box<T>
248where
249 T: IRawDatabase,
250{
251 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
252
253 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
254 (**self).begin_transaction().await
255 }
256
257 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
258 (**self).checkpoint(backup_path)
259 }
260}
261
262pub trait IRawDatabaseExt: IRawDatabase + Sized {
264 fn into_database(self) -> Database {
268 Database::new(self, ModuleRegistry::default())
269 }
270}
271
272impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
273
274impl<T> From<T> for Database
275where
276 T: IRawDatabase,
277{
278 fn from(raw: T) -> Self {
279 Self::new(raw, ModuleRegistry::default())
280 }
281}
282
283#[apply(async_trait_maybe_send!)]
286pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
287 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
289 async fn register(&self, key: &[u8]);
291 async fn notify(&self, key: &[u8]);
293
294 fn is_global(&self) -> bool;
297
298 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
300}
301
302#[apply(async_trait_maybe_send!)]
303impl<T> IDatabase for Arc<T>
304where
305 T: IDatabase + ?Sized,
306{
307 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
308 (**self).begin_transaction().await
309 }
310 async fn register(&self, key: &[u8]) {
311 (**self).register(key).await;
312 }
313 async fn notify(&self, key: &[u8]) {
314 (**self).notify(key).await;
315 }
316
317 fn is_global(&self) -> bool {
318 (**self).is_global()
319 }
320
321 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
322 (**self).checkpoint(backup_path)
323 }
324}
325
326struct BaseDatabase<RawDatabase> {
330 notifications: Arc<Notifications>,
331 raw: RawDatabase,
332}
333
334impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 f.write_str("BaseDatabase")
337 }
338}
339
340#[apply(async_trait_maybe_send!)]
341impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
342 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
343 Box::new(BaseDatabaseTransaction::new(
344 self.raw.begin_transaction().await,
345 self.notifications.clone(),
346 ))
347 }
348 async fn register(&self, key: &[u8]) {
349 self.notifications.register(key).await;
350 }
351 async fn notify(&self, key: &[u8]) {
352 self.notifications.notify(key);
353 }
354
355 fn is_global(&self) -> bool {
356 true
357 }
358
359 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
360 self.raw.checkpoint(backup_path)
361 }
362}
363
364#[derive(Clone, Debug)]
370pub struct Database {
371 inner: Arc<dyn IDatabase + 'static>,
372 module_decoders: ModuleDecoderRegistry,
373}
374
375impl Database {
376 pub fn strong_count(&self) -> usize {
377 Arc::strong_count(&self.inner)
378 }
379
380 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
381 self.inner
382 }
383}
384
385impl Database {
386 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
391 let inner = BaseDatabase {
392 raw,
393 notifications: Arc::new(Notifications::new()),
394 };
395 Self::new_from_arc(
396 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
397 module_decoders,
398 )
399 }
400
401 pub fn new_from_arc(
403 inner: Arc<dyn IDatabase + 'static>,
404 module_decoders: ModuleDecoderRegistry,
405 ) -> Self {
406 Self {
407 inner,
408 module_decoders,
409 }
410 }
411
412 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
414 Self {
415 inner: Arc::new(PrefixDatabase {
416 inner: self.inner.clone(),
417 global_dbtx_access_token: None,
418 prefix,
419 }),
420 module_decoders: self.module_decoders.clone(),
421 }
422 }
423
424 pub fn with_prefix_module_id(
428 &self,
429 module_instance_id: ModuleInstanceId,
430 ) -> (Self, GlobalDBTxAccessToken) {
431 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
432 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
433 (
434 Self {
435 inner: Arc::new(PrefixDatabase {
436 inner: self.inner.clone(),
437 global_dbtx_access_token: Some(global_dbtx_access_token),
438 prefix,
439 }),
440 module_decoders: self.module_decoders.clone(),
441 },
442 global_dbtx_access_token,
443 )
444 }
445
446 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
447 Self {
448 inner: self.inner.clone(),
449 module_decoders,
450 }
451 }
452
453 pub fn is_global(&self) -> bool {
455 self.inner.is_global()
456 }
457
458 pub fn ensure_global(&self) -> Result<()> {
460 if !self.is_global() {
461 bail!("Database instance not global");
462 }
463
464 Ok(())
465 }
466
467 pub fn ensure_isolated(&self) -> Result<()> {
469 if self.is_global() {
470 bail!("Database instance not isolated");
471 }
472
473 Ok(())
474 }
475
476 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
478 where
479 's: 'tx,
480 {
481 DatabaseTransaction::<Committable>::new(
482 self.inner.begin_transaction().await,
483 self.module_decoders.clone(),
484 )
485 }
486
487 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
489 where
490 's: 'tx,
491 {
492 self.begin_transaction().await.into_nc()
493 }
494
495 pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
496 self.inner.checkpoint(backup_path)
497 }
498
499 pub async fn autocommit<'s, 'dbtx, F, T, E>(
527 &'s self,
528 tx_fn: F,
529 max_attempts: Option<usize>,
530 ) -> Result<T, AutocommitError<E>>
531 where
532 's: 'dbtx,
533 for<'r, 'o> F: Fn(
534 &'r mut DatabaseTransaction<'o>,
535 PhantomBound<'dbtx, 'o>,
536 ) -> BoxFuture<'r, Result<T, E>>,
537 {
538 assert_ne!(max_attempts, Some(0));
539 let mut curr_attempts: usize = 0;
540
541 loop {
542 curr_attempts = curr_attempts
547 .checked_add(1)
548 .expect("db autocommit attempt counter overflowed");
549
550 let mut dbtx = self.begin_transaction().await;
551
552 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
553 let val = match tx_fn_res {
554 Ok(val) => val,
555 Err(err) => {
556 dbtx.ignore_uncommitted();
557 return Err(AutocommitError::ClosureError {
558 attempts: curr_attempts,
559 error: err,
560 });
561 }
562 };
563
564 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
565
566 match dbtx.commit_tx_result().await {
567 Ok(()) => {
568 return Ok(val);
569 }
570 Err(err) => {
571 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
572 warn!(
573 target: LOG_DB,
574 curr_attempts,
575 ?err,
576 "Database commit failed in an autocommit block - terminating"
577 );
578 return Err(AutocommitError::CommitFailed {
579 attempts: curr_attempts,
580 last_error: err,
581 });
582 }
583
584 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
585 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
586 warn!(
587 target: LOG_DB,
588 curr_attempts,
589 %err,
590 delay_ms = %delay,
591 "Database commit failed in an autocommit block - retrying"
592 );
593 crate::runtime::sleep(Duration::from_millis(delay)).await;
594 }
595 }
596 }
597 }
598
599 pub async fn wait_key_check<'a, K, T>(
604 &'a self,
605 key: &K,
606 checker: impl Fn(Option<K::Value>) -> Option<T>,
607 ) -> (T, DatabaseTransaction<'a, Committable>)
608 where
609 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
610 {
611 let key_bytes = key.to_bytes();
612 loop {
613 let notify = self.inner.register(&key_bytes);
615
616 let mut tx = self.inner.begin_transaction().await;
618
619 let maybe_value_bytes = tx
620 .raw_get_bytes(&key_bytes)
621 .await
622 .expect("Unrecoverable error when reading from database")
623 .map(|value_bytes| {
624 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
625 });
626
627 if let Some(value) = checker(maybe_value_bytes) {
628 return (
629 value,
630 DatabaseTransaction::new(tx, self.module_decoders.clone()),
631 );
632 }
633
634 notify.await;
636 }
639 }
640
641 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
643 where
644 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
645 {
646 self.wait_key_check(key, std::convert::identity).await.0
647 }
648}
649
650fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
651 let mut prefix = vec![MODULE_GLOBAL_PREFIX];
652 module_instance_id
653 .consensus_encode(&mut prefix)
654 .expect("Error encoding module instance id as prefix");
655 prefix
656}
657
658#[derive(Clone, Debug)]
661struct PrefixDatabase<Inner>
662where
663 Inner: Debug,
664{
665 prefix: Vec<u8>,
666 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
667 inner: Inner,
668}
669
670impl<Inner> PrefixDatabase<Inner>
671where
672 Inner: Debug,
673{
674 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
678 let mut full_key = self.prefix.clone();
679 full_key.extend_from_slice(key);
680 full_key
681 }
682}
683
684#[apply(async_trait_maybe_send!)]
685impl<Inner> IDatabase for PrefixDatabase<Inner>
686where
687 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
688{
689 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
690 Box::new(PrefixDatabaseTransaction {
691 inner: self.inner.begin_transaction().await,
692 global_dbtx_access_token: self.global_dbtx_access_token,
693 prefix: self.prefix.clone(),
694 })
695 }
696 async fn register(&self, key: &[u8]) {
697 self.inner.register(&self.get_full_key(key)).await;
698 }
699
700 async fn notify(&self, key: &[u8]) {
701 self.inner.notify(&self.get_full_key(key)).await;
702 }
703
704 fn is_global(&self) -> bool {
705 if self.global_dbtx_access_token.is_some() {
706 false
707 } else {
708 self.inner.is_global()
709 }
710 }
711
712 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
713 self.inner.checkpoint(backup_path)
714 }
715}
716
717#[derive(Debug)]
722struct PrefixDatabaseTransaction<Inner> {
723 inner: Inner,
724 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
725 prefix: Vec<u8>,
726}
727
728impl<Inner> PrefixDatabaseTransaction<Inner> {
729 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
733 let mut full_key = self.prefix.clone();
734 full_key.extend_from_slice(key);
735 full_key
736 }
737
738 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
739 Range {
740 start: self.get_full_key(range.start),
741 end: self.get_full_key(range.end),
742 }
743 }
744
745 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
746 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
747 }
748}
749
750#[apply(async_trait_maybe_send!)]
751impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
752where
753 Inner: IDatabaseTransaction,
754{
755 async fn commit_tx(&mut self) -> Result<()> {
756 self.inner.commit_tx().await
757 }
758
759 fn is_global(&self) -> bool {
760 if self.global_dbtx_access_token.is_some() {
761 false
762 } else {
763 self.inner.is_global()
764 }
765 }
766
767 fn global_dbtx(
768 &mut self,
769 access_token: GlobalDBTxAccessToken,
770 ) -> &mut dyn IDatabaseTransaction {
771 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
772 assert_eq!(
773 access_token, self_global_dbtx_access_token,
774 "Invalid access key used to access global_dbtx"
775 );
776 &mut self.inner
777 } else {
778 self.inner.global_dbtx(access_token)
779 }
780 }
781}
782
783#[apply(async_trait_maybe_send!)]
784impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
785where
786 Inner: IDatabaseTransactionOpsCore,
787{
788 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
789 let key = self.get_full_key(key);
790 self.inner.raw_insert_bytes(&key, value).await
791 }
792
793 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794 let key = self.get_full_key(key);
795 self.inner.raw_get_bytes(&key).await
796 }
797
798 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
799 let key = self.get_full_key(key);
800 self.inner.raw_remove_entry(&key).await
801 }
802
803 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
804 let key = self.get_full_key(key_prefix);
805 let stream = self.inner.raw_find_by_prefix(&key).await?;
806 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
807 }
808
809 async fn raw_find_by_prefix_sorted_descending(
810 &mut self,
811 key_prefix: &[u8],
812 ) -> Result<PrefixStream<'_>> {
813 let key = self.get_full_key(key_prefix);
814 let stream = self
815 .inner
816 .raw_find_by_prefix_sorted_descending(&key)
817 .await?;
818 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
819 }
820
821 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
822 let range = self.get_full_range(range);
823 let stream = self
824 .inner
825 .raw_find_by_range(Range {
826 start: &range.start,
827 end: &range.end,
828 })
829 .await?;
830 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
831 }
832
833 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
834 let key = self.get_full_key(key_prefix);
835 self.inner.raw_remove_by_prefix(&key).await
836 }
837}
838
839#[apply(async_trait_maybe_send!)]
840impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
841where
842 Inner: IDatabaseTransactionOps,
843{
844 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
845 self.inner.rollback_tx_to_savepoint().await
846 }
847
848 async fn set_tx_savepoint(&mut self) -> Result<()> {
849 self.set_tx_savepoint().await
850 }
851}
852
853#[apply(async_trait_maybe_send!)]
857pub trait IDatabaseTransactionOpsCore: MaybeSend {
858 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
860
861 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
863
864 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
866
867 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
870
871 async fn raw_find_by_prefix_sorted_descending(
873 &mut self,
874 key_prefix: &[u8],
875 ) -> Result<PrefixStream<'_>>;
876
877 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
881
882 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
884}
885
886#[apply(async_trait_maybe_send!)]
887impl<T> IDatabaseTransactionOpsCore for Box<T>
888where
889 T: IDatabaseTransactionOpsCore + ?Sized,
890{
891 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
892 (**self).raw_insert_bytes(key, value).await
893 }
894
895 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
896 (**self).raw_get_bytes(key).await
897 }
898
899 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
900 (**self).raw_remove_entry(key).await
901 }
902
903 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
904 (**self).raw_find_by_prefix(key_prefix).await
905 }
906
907 async fn raw_find_by_prefix_sorted_descending(
908 &mut self,
909 key_prefix: &[u8],
910 ) -> Result<PrefixStream<'_>> {
911 (**self)
912 .raw_find_by_prefix_sorted_descending(key_prefix)
913 .await
914 }
915
916 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
917 (**self).raw_find_by_range(range).await
918 }
919
920 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
921 (**self).raw_remove_by_prefix(key_prefix).await
922 }
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for &mut T
927where
928 T: IDatabaseTransactionOpsCore + ?Sized,
929{
930 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
931 (**self).raw_insert_bytes(key, value).await
932 }
933
934 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
935 (**self).raw_get_bytes(key).await
936 }
937
938 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
939 (**self).raw_remove_entry(key).await
940 }
941
942 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
943 (**self).raw_find_by_prefix(key_prefix).await
944 }
945
946 async fn raw_find_by_prefix_sorted_descending(
947 &mut self,
948 key_prefix: &[u8],
949 ) -> Result<PrefixStream<'_>> {
950 (**self)
951 .raw_find_by_prefix_sorted_descending(key_prefix)
952 .await
953 }
954
955 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
956 (**self).raw_find_by_range(range).await
957 }
958
959 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
960 (**self).raw_remove_by_prefix(key_prefix).await
961 }
962}
963
964#[apply(async_trait_maybe_send!)]
970pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
971 async fn set_tx_savepoint(&mut self) -> Result<()>;
980
981 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
982}
983
984#[apply(async_trait_maybe_send!)]
985impl<T> IDatabaseTransactionOps for Box<T>
986where
987 T: IDatabaseTransactionOps + ?Sized,
988{
989 async fn set_tx_savepoint(&mut self) -> Result<()> {
990 (**self).set_tx_savepoint().await
991 }
992
993 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
994 (**self).rollback_tx_to_savepoint().await
995 }
996}
997
998#[apply(async_trait_maybe_send!)]
999impl<T> IDatabaseTransactionOps for &mut T
1000where
1001 T: IDatabaseTransactionOps + ?Sized,
1002{
1003 async fn set_tx_savepoint(&mut self) -> Result<()> {
1004 (**self).set_tx_savepoint().await
1005 }
1006
1007 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1008 (**self).rollback_tx_to_savepoint().await
1009 }
1010}
1011
1012#[apply(async_trait_maybe_send!)]
1018pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1019 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1020 where
1021 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1022
1023 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1024 where
1025 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1026 K::Value: MaybeSend + MaybeSync;
1027
1028 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1029 where
1030 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1031 K::Value: MaybeSend + MaybeSync;
1032
1033 async fn find_by_range<K>(
1034 &mut self,
1035 key_range: Range<K>,
1036 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1037 where
1038 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1039 K::Value: MaybeSend + MaybeSync;
1040
1041 async fn find_by_prefix<KP>(
1042 &mut self,
1043 key_prefix: &KP,
1044 ) -> Pin<
1045 Box<
1046 maybe_add_send!(
1047 dyn Stream<
1048 Item = (
1049 KP::Record,
1050 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1051 ),
1052 > + '_
1053 ),
1054 >,
1055 >
1056 where
1057 KP: DatabaseLookup + MaybeSend + MaybeSync,
1058 KP::Record: DatabaseKey;
1059
1060 async fn find_by_prefix_sorted_descending<KP>(
1061 &mut self,
1062 key_prefix: &KP,
1063 ) -> Pin<
1064 Box<
1065 maybe_add_send!(
1066 dyn Stream<
1067 Item = (
1068 KP::Record,
1069 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070 ),
1071 > + '_
1072 ),
1073 >,
1074 >
1075 where
1076 KP: DatabaseLookup + MaybeSend + MaybeSync,
1077 KP::Record: DatabaseKey;
1078
1079 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1080 where
1081 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1082
1083 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1084 where
1085 KP: DatabaseLookup + MaybeSend + MaybeSync;
1086}
1087
1088#[apply(async_trait_maybe_send!)]
1091impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1092where
1093 T: IDatabaseTransactionOpsCore + WithDecoders,
1094{
1095 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1096 where
1097 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1098 {
1099 let key_bytes = key.to_bytes();
1100 let raw = self
1101 .raw_get_bytes(&key_bytes)
1102 .await
1103 .expect("Unrecoverable error occurred while reading and entry from the database");
1104 raw.map(|value_bytes| {
1105 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1106 })
1107 }
1108
1109 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1110 where
1111 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1112 K::Value: MaybeSend + MaybeSync,
1113 {
1114 let key_bytes = key.to_bytes();
1115 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1116 .await
1117 .expect("Unrecoverable error occurred while inserting entry into the database")
1118 .map(|value_bytes| {
1119 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1120 })
1121 }
1122
1123 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1124 where
1125 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1126 K::Value: MaybeSend + MaybeSync,
1127 {
1128 if let Some(prev) = self.insert_entry(key, value).await {
1129 debug_assert!(
1130 false,
1131 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1132 );
1133 warn!(
1134 target: LOG_DB,
1135 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1136 );
1137 }
1138 }
1139
1140 async fn find_by_range<K>(
1141 &mut self,
1142 key_range: Range<K>,
1143 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1144 where
1145 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1146 K::Value: MaybeSend + MaybeSync,
1147 {
1148 let decoders = self.decoders().clone();
1149 Box::pin(
1150 self.raw_find_by_range(Range {
1151 start: &key_range.start.to_bytes(),
1152 end: &key_range.end.to_bytes(),
1153 })
1154 .await
1155 .expect("Unrecoverable error occurred while listing entries from the database")
1156 .map(move |(key_bytes, value_bytes)| {
1157 let key = decode_key_expect(&key_bytes, &decoders);
1158 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1159 (key, value)
1160 }),
1161 )
1162 }
1163
1164 async fn find_by_prefix<KP>(
1165 &mut self,
1166 key_prefix: &KP,
1167 ) -> Pin<
1168 Box<
1169 maybe_add_send!(
1170 dyn Stream<
1171 Item = (
1172 KP::Record,
1173 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1174 ),
1175 > + '_
1176 ),
1177 >,
1178 >
1179 where
1180 KP: DatabaseLookup + MaybeSend + MaybeSync,
1181 KP::Record: DatabaseKey,
1182 {
1183 let decoders = self.decoders().clone();
1184 Box::pin(
1185 self.raw_find_by_prefix(&key_prefix.to_bytes())
1186 .await
1187 .expect("Unrecoverable error occurred while listing entries from the database")
1188 .map(move |(key_bytes, value_bytes)| {
1189 let key = decode_key_expect(&key_bytes, &decoders);
1190 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1191 (key, value)
1192 }),
1193 )
1194 }
1195
1196 async fn find_by_prefix_sorted_descending<KP>(
1197 &mut self,
1198 key_prefix: &KP,
1199 ) -> Pin<
1200 Box<
1201 maybe_add_send!(
1202 dyn Stream<
1203 Item = (
1204 KP::Record,
1205 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1206 ),
1207 > + '_
1208 ),
1209 >,
1210 >
1211 where
1212 KP: DatabaseLookup + MaybeSend + MaybeSync,
1213 KP::Record: DatabaseKey,
1214 {
1215 let decoders = self.decoders().clone();
1216 Box::pin(
1217 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1218 .await
1219 .expect("Unrecoverable error occurred while listing entries from the database")
1220 .map(move |(key_bytes, value_bytes)| {
1221 let key = decode_key_expect(&key_bytes, &decoders);
1222 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1223 (key, value)
1224 }),
1225 )
1226 }
1227 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1228 where
1229 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1230 {
1231 let key_bytes = key.to_bytes();
1232 self.raw_remove_entry(&key_bytes)
1233 .await
1234 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1235 .map(|value_bytes| {
1236 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1237 })
1238 }
1239 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1240 where
1241 KP: DatabaseLookup + MaybeSend + MaybeSync,
1242 {
1243 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1244 .await
1245 .expect("Unrecoverable error when removing entries from the database");
1246 }
1247}
1248
1249pub trait WithDecoders {
1252 fn decoders(&self) -> &ModuleDecoderRegistry;
1253}
1254
1255#[apply(async_trait_maybe_send!)]
1257pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1258 async fn commit_tx(self) -> Result<()>;
1259}
1260
1261#[apply(async_trait_maybe_send!)]
1265pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1266 async fn commit_tx(&mut self) -> Result<()>;
1268
1269 fn is_global(&self) -> bool;
1271
1272 #[doc(hidden)]
1277 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1278 -> &mut dyn IDatabaseTransaction;
1279}
1280
1281#[apply(async_trait_maybe_send!)]
1282impl<T> IDatabaseTransaction for Box<T>
1283where
1284 T: IDatabaseTransaction + ?Sized,
1285{
1286 async fn commit_tx(&mut self) -> Result<()> {
1287 (**self).commit_tx().await
1288 }
1289
1290 fn is_global(&self) -> bool {
1291 (**self).is_global()
1292 }
1293
1294 fn global_dbtx(
1295 &mut self,
1296 access_token: GlobalDBTxAccessToken,
1297 ) -> &mut dyn IDatabaseTransaction {
1298 (**self).global_dbtx(access_token)
1299 }
1300}
1301
1302#[apply(async_trait_maybe_send!)]
1303impl<'a, T> IDatabaseTransaction for &'a mut T
1304where
1305 T: IDatabaseTransaction + ?Sized,
1306{
1307 async fn commit_tx(&mut self) -> Result<()> {
1308 (**self).commit_tx().await
1309 }
1310
1311 fn is_global(&self) -> bool {
1312 (**self).is_global()
1313 }
1314
1315 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1316 (**self).global_dbtx(access_key)
1317 }
1318}
1319
1320struct BaseDatabaseTransaction<Tx> {
1323 raw: Option<Tx>,
1325 notify_queue: Option<NotifyQueue>,
1326 notifications: Arc<Notifications>,
1327}
1328
1329impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1330where
1331 Tx: fmt::Debug,
1332{
1333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1334 f.write_fmt(format_args!(
1335 "BaseDatabaseTransaction{{ raw={:?} }}",
1336 self.raw
1337 ))
1338 }
1339}
1340impl<Tx> BaseDatabaseTransaction<Tx>
1341where
1342 Tx: IRawDatabaseTransaction,
1343{
1344 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1345 Self {
1346 raw: Some(dbtx),
1347 notifications,
1348 notify_queue: Some(NotifyQueue::new()),
1349 }
1350 }
1351
1352 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1353 self.notify_queue
1354 .as_mut()
1355 .context("can not call add_notification_key after commit")?
1356 .add(&key);
1357 Ok(())
1358 }
1359}
1360
1361#[apply(async_trait_maybe_send!)]
1362impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1363 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1364 self.add_notification_key(key)?;
1365 self.raw
1366 .as_mut()
1367 .context("Cannot insert into already consumed transaction")?
1368 .raw_insert_bytes(key, value)
1369 .await
1370 }
1371
1372 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1373 self.raw
1374 .as_mut()
1375 .context("Cannot retrieve from already consumed transaction")?
1376 .raw_get_bytes(key)
1377 .await
1378 }
1379
1380 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1381 self.add_notification_key(key)?;
1382 self.raw
1383 .as_mut()
1384 .context("Cannot remove from already consumed transaction")?
1385 .raw_remove_entry(key)
1386 .await
1387 }
1388
1389 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1390 self.raw
1391 .as_mut()
1392 .context("Cannot retrieve from already consumed transaction")?
1393 .raw_find_by_range(key_range)
1394 .await
1395 }
1396
1397 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1398 self.raw
1399 .as_mut()
1400 .context("Cannot retrieve from already consumed transaction")?
1401 .raw_find_by_prefix(key_prefix)
1402 .await
1403 }
1404
1405 async fn raw_find_by_prefix_sorted_descending(
1406 &mut self,
1407 key_prefix: &[u8],
1408 ) -> Result<PrefixStream<'_>> {
1409 self.raw
1410 .as_mut()
1411 .context("Cannot retrieve from already consumed transaction")?
1412 .raw_find_by_prefix_sorted_descending(key_prefix)
1413 .await
1414 }
1415
1416 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1417 self.raw
1418 .as_mut()
1419 .context("Cannot remove from already consumed transaction")?
1420 .raw_remove_by_prefix(key_prefix)
1421 .await
1422 }
1423}
1424
1425#[apply(async_trait_maybe_send!)]
1426impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1427 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1428 self.raw
1429 .as_mut()
1430 .context("Cannot rollback to a savepoint on an already consumed transaction")?
1431 .rollback_tx_to_savepoint()
1432 .await?;
1433 Ok(())
1434 }
1435
1436 async fn set_tx_savepoint(&mut self) -> Result<()> {
1437 self.raw
1438 .as_mut()
1439 .context("Cannot set a tx savepoint on an already consumed transaction")?
1440 .set_tx_savepoint()
1441 .await?;
1442 Ok(())
1443 }
1444}
1445
1446#[apply(async_trait_maybe_send!)]
1447impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1448 for BaseDatabaseTransaction<Tx>
1449{
1450 async fn commit_tx(&mut self) -> Result<()> {
1451 self.raw
1452 .take()
1453 .context("Cannot commit an already committed transaction")?
1454 .commit_tx()
1455 .await?;
1456 self.notifications.submit_queue(
1457 &self
1458 .notify_queue
1459 .take()
1460 .expect("commit must be called only once"),
1461 );
1462 Ok(())
1463 }
1464
1465 fn is_global(&self) -> bool {
1466 true
1467 }
1468
1469 fn global_dbtx(
1470 &mut self,
1471 _access_token: GlobalDBTxAccessToken,
1472 ) -> &mut dyn IDatabaseTransaction {
1473 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1474 }
1475}
1476
1477#[derive(Clone)]
1480struct CommitTracker {
1481 is_committed: bool,
1483 has_writes: bool,
1485 ignore_uncommitted: bool,
1487}
1488
1489impl Drop for CommitTracker {
1490 fn drop(&mut self) {
1491 if self.has_writes && !self.is_committed {
1492 if self.ignore_uncommitted {
1493 trace!(
1494 target: LOG_DB,
1495 "DatabaseTransaction has writes and has not called commit, but that's expected."
1496 );
1497 } else {
1498 warn!(
1499 target: LOG_DB,
1500 location = ?backtrace::Backtrace::new(),
1501 "DatabaseTransaction has writes and has not called commit."
1502 );
1503 }
1504 }
1505 }
1506}
1507
1508enum MaybeRef<'a, T> {
1509 Owned(T),
1510 Borrowed(&'a mut T),
1511}
1512
1513impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1514 type Target = T;
1515
1516 fn deref(&self) -> &Self::Target {
1517 match self {
1518 MaybeRef::Owned(o) => o,
1519 MaybeRef::Borrowed(r) => r,
1520 }
1521 }
1522}
1523
1524impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1525 fn deref_mut(&mut self) -> &mut Self::Target {
1526 match self {
1527 MaybeRef::Owned(o) => o,
1528 MaybeRef::Borrowed(r) => r,
1529 }
1530 }
1531}
1532
1533pub struct Committable;
1537
1538pub struct NonCommittable;
1543pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1547 tx: Box<dyn IDatabaseTransaction + 'tx>,
1548 decoders: ModuleDecoderRegistry,
1549 commit_tracker: MaybeRef<'tx, CommitTracker>,
1550 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1551 capability: marker::PhantomData<Cap>,
1552}
1553
1554impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1555 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1556 f.write_fmt(format_args!(
1557 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1558 self.tx, self.decoders
1559 ))
1560 }
1561}
1562
1563impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1564 fn decoders(&self) -> &ModuleDecoderRegistry {
1565 &self.decoders
1566 }
1567}
1568
1569#[instrument(level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1570fn decode_value<V: DatabaseValue>(
1571 value_bytes: &[u8],
1572 decoders: &ModuleDecoderRegistry,
1573) -> Result<V, DecodingError> {
1574 trace!(
1575 bytes = %AbbreviateHexBytes(value_bytes),
1576 "decoding value",
1577 );
1578 V::from_bytes(value_bytes, decoders)
1579}
1580
1581fn decode_value_expect<V: DatabaseValue>(
1582 value_bytes: &[u8],
1583 decoders: &ModuleDecoderRegistry,
1584 key_bytes: &[u8],
1585) -> V {
1586 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1587 panic!(
1588 "Unrecoverable decoding DatabaseValue as {}; err={}, bytes={}; key_bytes={}",
1589 any::type_name::<V>(),
1590 err,
1591 AbbreviateHexBytes(value_bytes),
1592 AbbreviateHexBytes(key_bytes),
1593 )
1594 })
1595}
1596
1597fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1598 trace!(
1599 bytes = %AbbreviateHexBytes(key_bytes),
1600 "decoding key",
1601 );
1602 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1603 panic!(
1604 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1605 any::type_name::<K>(),
1606 err,
1607 AbbreviateHexBytes(key_bytes)
1608 )
1609 })
1610}
1611
1612impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1613 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1615 DatabaseTransaction {
1616 tx: self.tx,
1617 decoders: self.decoders,
1618 commit_tracker: self.commit_tracker,
1619 on_commit_hooks: self.on_commit_hooks,
1620 capability: PhantomData::<NonCommittable>,
1621 }
1622 }
1623
1624 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1626 where
1627 's: 'a,
1628 {
1629 self.to_ref().into_nc()
1630 }
1631
1632 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1634 where
1635 'tx: 'a,
1636 {
1637 DatabaseTransaction {
1638 tx: Box::new(PrefixDatabaseTransaction {
1639 inner: self.tx,
1640 global_dbtx_access_token: None,
1641 prefix,
1642 }),
1643 decoders: self.decoders,
1644 commit_tracker: self.commit_tracker,
1645 on_commit_hooks: self.on_commit_hooks,
1646 capability: self.capability,
1647 }
1648 }
1649
1650 pub fn with_prefix_module_id<'a: 'tx>(
1654 self,
1655 module_instance_id: ModuleInstanceId,
1656 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1657 where
1658 'tx: 'a,
1659 {
1660 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1661 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1662 (
1663 DatabaseTransaction {
1664 tx: Box::new(PrefixDatabaseTransaction {
1665 inner: self.tx,
1666 global_dbtx_access_token: Some(global_dbtx_access_token),
1667 prefix,
1668 }),
1669 decoders: self.decoders,
1670 commit_tracker: self.commit_tracker,
1671 on_commit_hooks: self.on_commit_hooks,
1672 capability: self.capability,
1673 },
1674 global_dbtx_access_token,
1675 )
1676 }
1677
1678 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1680 where
1681 's: 'a,
1682 {
1683 let decoders = self.decoders.clone();
1684
1685 DatabaseTransaction {
1686 tx: Box::new(&mut self.tx),
1687 decoders,
1688 commit_tracker: match self.commit_tracker {
1689 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1690 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1691 },
1692 on_commit_hooks: match self.on_commit_hooks {
1693 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1694 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1695 },
1696 capability: self.capability,
1697 }
1698 }
1699
1700 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1702 where
1703 'tx: 'a,
1704 {
1705 DatabaseTransaction {
1706 tx: Box::new(PrefixDatabaseTransaction {
1707 inner: &mut self.tx,
1708 global_dbtx_access_token: None,
1709 prefix,
1710 }),
1711 decoders: self.decoders.clone(),
1712 commit_tracker: match self.commit_tracker {
1713 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1714 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1715 },
1716 on_commit_hooks: match self.on_commit_hooks {
1717 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1718 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1719 },
1720 capability: self.capability,
1721 }
1722 }
1723
1724 pub fn to_ref_with_prefix_module_id<'a>(
1725 &'a mut self,
1726 module_instance_id: ModuleInstanceId,
1727 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1728 where
1729 'tx: 'a,
1730 {
1731 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1732 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1733 (
1734 DatabaseTransaction {
1735 tx: Box::new(PrefixDatabaseTransaction {
1736 inner: &mut self.tx,
1737 global_dbtx_access_token: Some(global_dbtx_access_token),
1738 prefix,
1739 }),
1740 decoders: self.decoders.clone(),
1741 commit_tracker: match self.commit_tracker {
1742 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1743 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1744 },
1745 on_commit_hooks: match self.on_commit_hooks {
1746 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1747 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1748 },
1749 capability: self.capability,
1750 },
1751 global_dbtx_access_token,
1752 )
1753 }
1754
1755 pub fn is_global(&self) -> bool {
1757 self.tx.is_global()
1758 }
1759
1760 pub fn ensure_global(&self) -> Result<()> {
1762 if !self.is_global() {
1763 bail!("Database instance not global");
1764 }
1765
1766 Ok(())
1767 }
1768
1769 pub fn ensure_isolated(&self) -> Result<()> {
1771 if self.is_global() {
1772 bail!("Database instance not isolated");
1773 }
1774
1775 Ok(())
1776 }
1777
1778 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1780 self.commit_tracker.ignore_uncommitted = true;
1781 self
1782 }
1783
1784 pub fn warn_uncommitted(&mut self) -> &mut Self {
1786 self.commit_tracker.ignore_uncommitted = false;
1787 self
1788 }
1789
1790 #[instrument(level = "trace", skip_all)]
1792 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1793 self.on_commit_hooks.push(Box::new(f));
1794 }
1795
1796 pub fn global_dbtx<'a>(
1797 &'a mut self,
1798 access_token: GlobalDBTxAccessToken,
1799 ) -> DatabaseTransaction<'a, Cap>
1800 where
1801 'tx: 'a,
1802 {
1803 let decoders = self.decoders.clone();
1804
1805 DatabaseTransaction {
1806 tx: Box::new(self.tx.global_dbtx(access_token)),
1807 decoders,
1808 commit_tracker: match self.commit_tracker {
1809 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1810 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1811 },
1812 on_commit_hooks: match self.on_commit_hooks {
1813 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1814 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1815 },
1816 capability: self.capability,
1817 }
1818 }
1819}
1820
1821#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1823pub struct GlobalDBTxAccessToken(u32);
1824
1825impl GlobalDBTxAccessToken {
1826 fn from_prefix(prefix: &[u8]) -> Self {
1837 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1838 }
1839}
1840
1841impl<'tx> DatabaseTransaction<'tx, Committable> {
1842 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1843 Self {
1844 tx: dbtx,
1845 decoders,
1846 commit_tracker: MaybeRef::Owned(CommitTracker {
1847 is_committed: false,
1848 has_writes: false,
1849 ignore_uncommitted: false,
1850 }),
1851 on_commit_hooks: MaybeRef::Owned(vec![]),
1852 capability: PhantomData,
1853 }
1854 }
1855
1856 pub async fn commit_tx_result(mut self) -> Result<()> {
1857 self.commit_tracker.is_committed = true;
1858 let commit_result = self.tx.commit_tx().await;
1859
1860 if commit_result.is_ok() {
1862 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1863 hook();
1864 }
1865 }
1866
1867 commit_result
1868 }
1869
1870 pub async fn commit_tx(mut self) {
1871 self.commit_tracker.is_committed = true;
1872 self.commit_tx_result()
1873 .await
1874 .expect("Unrecoverable error occurred while committing to the database.");
1875 }
1876}
1877
1878#[apply(async_trait_maybe_send!)]
1879impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1880where
1881 Cap: Send,
1882{
1883 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1884 self.commit_tracker.has_writes = true;
1885 self.tx.raw_insert_bytes(key, value).await
1886 }
1887
1888 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1889 self.tx.raw_get_bytes(key).await
1890 }
1891
1892 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1893 self.tx.raw_remove_entry(key).await
1894 }
1895
1896 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1897 self.tx.raw_find_by_range(key_range).await
1898 }
1899
1900 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1901 self.tx.raw_find_by_prefix(key_prefix).await
1902 }
1903
1904 async fn raw_find_by_prefix_sorted_descending(
1905 &mut self,
1906 key_prefix: &[u8],
1907 ) -> Result<PrefixStream<'_>> {
1908 self.tx
1909 .raw_find_by_prefix_sorted_descending(key_prefix)
1910 .await
1911 }
1912
1913 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1914 self.commit_tracker.has_writes = true;
1915 self.tx.raw_remove_by_prefix(key_prefix).await
1916 }
1917}
1918#[apply(async_trait_maybe_send!)]
1919impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1920 async fn set_tx_savepoint(&mut self) -> Result<()> {
1921 self.tx.set_tx_savepoint().await
1922 }
1923
1924 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1925 self.tx.rollback_tx_to_savepoint().await
1926 }
1927}
1928
1929impl<T> DatabaseKeyPrefix for T
1930where
1931 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1932{
1933 fn to_bytes(&self) -> Vec<u8> {
1934 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1935 self.consensus_encode(&mut data)
1936 .expect("Writing to vec is infallible");
1937 data
1938 }
1939}
1940
1941impl<T> DatabaseKey for T
1942where
1943 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1946{
1947 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1948 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1949 if data.is_empty() {
1950 return Err(DecodingError::wrong_length(1, 0));
1952 }
1953
1954 if data[0] != Self::DB_PREFIX {
1955 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1956 }
1957
1958 <Self as crate::encoding::Decodable>::consensus_decode(
1959 &mut std::io::Cursor::new(&data[1..]),
1960 modules,
1961 )
1962 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1963 }
1964}
1965
1966impl<T> DatabaseValue for T
1967where
1968 T: Debug + Encodable + Decodable,
1969{
1970 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1971 T::consensus_decode(&mut std::io::Cursor::new(data), modules)
1972 .map_err(|e| DecodingError::Other(e.0))
1973 }
1974
1975 fn to_bytes(&self) -> Vec<u8> {
1976 let mut bytes = Vec::new();
1977 self.consensus_encode(&mut bytes)
1978 .expect("writing to vec can't fail");
1979 bytes
1980 }
1981}
1982
1983#[macro_export]
2044macro_rules! impl_db_record {
2045 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => {
2046 impl $crate::db::DatabaseRecord for $key {
2047 const DB_PREFIX: u8 = $db_prefix as u8;
2048 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2049 type Key = Self;
2050 type Value = $val;
2051 }
2052 $(
2053 impl_db_record! {
2054 @impl_notify_marker key = $key, notify_on_modify = $notify
2055 }
2056 )?
2057 };
2058 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2060 impl $crate::db::DatabaseKeyWithNotify for $key {}
2061 };
2062 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2064}
2065
2066#[macro_export]
2067macro_rules! impl_db_lookup{
2068 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2069 $(
2070 impl $crate::db::DatabaseLookup for $query_prefix {
2071 type Record = $key;
2072 }
2073 )*
2074 };
2075}
2076
2077#[derive(Debug, Encodable, Decodable, Serialize)]
2079pub struct DatabaseVersionKeyV0;
2080
2081#[derive(Debug, Encodable, Decodable, Serialize)]
2082pub struct DatabaseVersionKey(pub ModuleInstanceId);
2083
2084#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2085pub struct DatabaseVersion(pub u64);
2086
2087impl_db_record!(
2088 key = DatabaseVersionKeyV0,
2089 value = DatabaseVersion,
2090 db_prefix = DbKeyPrefix::DatabaseVersion
2091);
2092
2093impl_db_record!(
2094 key = DatabaseVersionKey,
2095 value = DatabaseVersion,
2096 db_prefix = DbKeyPrefix::DatabaseVersion
2097);
2098
2099impl std::fmt::Display for DatabaseVersion {
2100 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2101 write!(f, "{}", self.0)
2102 }
2103}
2104
2105impl DatabaseVersion {
2106 pub fn increment(&self) -> Self {
2107 Self(self.0 + 1)
2108 }
2109}
2110
2111impl std::fmt::Display for DbKeyPrefix {
2112 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2113 write!(f, "{self:?}")
2114 }
2115}
2116
2117#[repr(u8)]
2118#[derive(Clone, EnumIter, Debug)]
2119pub enum DbKeyPrefix {
2120 DatabaseVersion = 0x50,
2121 ClientBackup = 0x51,
2122}
2123
2124#[derive(Debug, Error)]
2125pub enum DecodingError {
2126 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2127 WrongPrefix { expected: u8, found: u8 },
2128 #[error("Key had a wrong length, expected {expected} but got {found}")]
2129 WrongLength { expected: usize, found: usize },
2130 #[error("Other decoding error: {0}")]
2131 Other(anyhow::Error),
2132}
2133
2134impl DecodingError {
2135 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2136 Self::Other(anyhow::Error::from(error))
2137 }
2138
2139 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2140 Self::WrongPrefix { expected, found }
2141 }
2142
2143 pub fn wrong_length(expected: usize, found: usize) -> Self {
2144 Self::WrongLength { expected, found }
2145 }
2146}
2147
2148#[macro_export]
2149macro_rules! push_db_pair_items {
2150 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2151 let db_items =
2152 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2153 .await
2154 .map(|(key, val)| {
2155 (
2156 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2157 val,
2158 )
2159 })
2160 .collect::<BTreeMap<String, $value_type>>()
2161 .await;
2162
2163 $map.insert($key_literal.to_string(), Box::new(db_items));
2164 };
2165}
2166
2167#[macro_export]
2168macro_rules! push_db_key_items {
2169 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => {
2170 let db_items =
2171 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2172 .await
2173 .map(|(key, _)| key)
2174 .collect::<Vec<$key_type>>()
2175 .await;
2176
2177 $map.insert($key_literal.to_string(), Box::new(db_items));
2178 };
2179}
2180
2181pub type CoreMigrationFn = for<'tx> fn(
2184 MigrationContext<'tx>,
2185) -> Pin<
2186 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2187>;
2188
2189pub fn get_current_database_version<F>(
2193 migrations: &BTreeMap<DatabaseVersion, F>,
2194) -> DatabaseVersion {
2195 let versions = migrations.keys().copied().collect::<Vec<_>>();
2196
2197 if !versions
2200 .windows(2)
2201 .all(|window| window[0].increment() == window[1])
2202 {
2203 panic!("Database Migrations are not defined contiguously");
2204 }
2205
2206 versions
2207 .last()
2208 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2209}
2210
2211pub async fn apply_migrations_server(
2213 db: &Database,
2214 kind: String,
2215 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2216) -> Result<(), anyhow::Error> {
2217 apply_migrations(db, kind, migrations, None, None).await
2218}
2219
2220pub async fn apply_migrations(
2232 db: &Database,
2233 kind: String,
2234 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2235 module_instance_id: Option<ModuleInstanceId>,
2236 external_prefixes_above: Option<u8>,
2239) -> Result<(), anyhow::Error> {
2240 let mut dbtx = db.begin_transaction_nc().await;
2243 let is_new_db = dbtx
2244 .raw_find_by_prefix(&[])
2245 .await?
2246 .filter(|(key, _v)| {
2247 std::future::ready(
2248 external_prefixes_above.map_or(true, |external_prefixes_above| {
2249 !key.is_empty() && key[0] < external_prefixes_above
2250 }),
2251 )
2252 })
2253 .next()
2254 .await
2255 .is_none();
2256
2257 let target_db_version = get_current_database_version(&migrations);
2258
2259 create_database_version(
2261 db,
2262 target_db_version,
2263 module_instance_id,
2264 kind.clone(),
2265 is_new_db,
2266 )
2267 .await?;
2268
2269 let mut global_dbtx = db.begin_transaction().await;
2270 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2271
2272 let disk_version = global_dbtx
2273 .get_value(&DatabaseVersionKey(module_instance_id_key))
2274 .await;
2275
2276 let db_version = if let Some(disk_version) = disk_version {
2277 let mut current_db_version = disk_version;
2278
2279 if current_db_version > target_db_version {
2280 return Err(anyhow::anyhow!(format!(
2281 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2282 )));
2283 }
2284
2285 while current_db_version < target_db_version {
2286 if let Some(migration) = migrations.get(¤t_db_version) {
2287 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2288 migration(MigrationContext {
2289 dbtx: global_dbtx.to_ref_nc(),
2290 module_instance_id,
2291 })
2292 .await?;
2293 } else {
2294 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2295 }
2296
2297 current_db_version = current_db_version.increment();
2298 global_dbtx
2299 .insert_entry(
2300 &DatabaseVersionKey(module_instance_id_key),
2301 ¤t_db_version,
2302 )
2303 .await;
2304 }
2305
2306 current_db_version
2307 } else {
2308 target_db_version
2309 };
2310
2311 global_dbtx.commit_tx_result().await?;
2312 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2313 Ok(())
2314}
2315
2316pub async fn create_database_version(
2320 db: &Database,
2321 target_db_version: DatabaseVersion,
2322 module_instance_id: Option<ModuleInstanceId>,
2323 kind: String,
2324 is_new_db: bool,
2325) -> Result<(), anyhow::Error> {
2326 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2327
2328 let mut global_dbtx = db.begin_transaction().await;
2332 if global_dbtx
2333 .get_value(&DatabaseVersionKey(key_module_instance_id))
2334 .await
2335 .is_none()
2336 {
2337 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2346 remove_current_db_version_if_exists(
2347 &mut global_dbtx
2348 .to_ref_with_prefix_module_id(module_instance_id)
2349 .0
2350 .into_nc(),
2351 is_new_db,
2352 target_db_version,
2353 )
2354 .await
2355 } else {
2356 remove_current_db_version_if_exists(
2357 &mut global_dbtx.to_ref().into_nc(),
2358 is_new_db,
2359 target_db_version,
2360 )
2361 .await
2362 };
2363
2364 info!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2366 global_dbtx
2367 .insert_new_entry(
2368 &DatabaseVersionKey(key_module_instance_id),
2369 ¤t_version_in_module,
2370 )
2371 .await;
2372 global_dbtx.commit_tx_result().await?;
2373 }
2374
2375 Ok(())
2376}
2377
2378async fn remove_current_db_version_if_exists(
2383 version_dbtx: &mut DatabaseTransaction<'_>,
2384 is_new_db: bool,
2385 target_db_version: DatabaseVersion,
2386) -> DatabaseVersion {
2387 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2391 match current_version_in_module {
2392 Some(database_version) => database_version,
2393 None if is_new_db => target_db_version,
2394 None => DatabaseVersion(0),
2395 }
2396}
2397
2398fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2401 module_instance_id.map_or_else(
2403 || MODULE_GLOBAL_PREFIX.into(),
2404 |module_instance_id| module_instance_id,
2405 )
2406}
2407
2408pub struct MigrationContext<'tx> {
2409 dbtx: DatabaseTransaction<'tx>,
2410 module_instance_id: Option<ModuleInstanceId>,
2411}
2412
2413impl<'tx> MigrationContext<'tx> {
2414 pub fn dbtx(&mut self) -> DatabaseTransaction {
2415 if let Some(module_instance_id) = self.module_instance_id {
2416 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2417 } else {
2418 self.dbtx.to_ref_nc()
2419 }
2420 }
2421
2422 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2423 self.module_instance_id
2424 }
2425
2426 #[doc(hidden)]
2427 pub fn __global_dbtx(&mut self) -> &mut DatabaseTransaction<'tx> {
2428 &mut self.dbtx
2429 }
2430}
2431
2432#[allow(unused_imports)]
2433mod test_utils {
2434 use std::collections::BTreeMap;
2435 use std::time::Duration;
2436
2437 use fedimint_core::db::MigrationContext;
2438 use futures::future::ready;
2439 use futures::{Future, FutureExt, StreamExt};
2440 use rand::Rng;
2441 use tokio::join;
2442
2443 use super::{
2444 apply_migrations, CoreMigrationFn, Database, DatabaseTransaction, DatabaseVersion,
2445 DatabaseVersionKey, DatabaseVersionKeyV0,
2446 };
2447 use crate::core::ModuleKind;
2448 use crate::db::mem_impl::MemDatabase;
2449 use crate::db::{
2450 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2451 };
2452 use crate::encoding::{Decodable, Encodable};
2453 use crate::module::registry::ModuleDecoderRegistry;
2454
2455 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2456 crate::runtime::timeout(Duration::from_millis(10), fut)
2457 .await
2458 .ok()
2459 }
2460
2461 #[repr(u8)]
2462 #[derive(Clone)]
2463 pub enum TestDbKeyPrefix {
2464 Test = 0x42,
2465 AltTest = 0x43,
2466 PercentTestKey = 0x25,
2467 }
2468
2469 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2470 pub(super) struct TestKey(pub u64);
2471
2472 #[derive(Debug, Encodable, Decodable)]
2473 struct DbPrefixTestPrefix;
2474
2475 impl_db_record!(
2476 key = TestKey,
2477 value = TestVal,
2478 db_prefix = TestDbKeyPrefix::Test,
2479 notify_on_modify = true,
2480 );
2481 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2482
2483 #[derive(Debug, Encodable, Decodable)]
2484 struct TestKeyV0(u64, u64);
2485
2486 #[derive(Debug, Encodable, Decodable)]
2487 struct DbPrefixTestPrefixV0;
2488
2489 impl_db_record!(
2490 key = TestKeyV0,
2491 value = TestVal,
2492 db_prefix = TestDbKeyPrefix::Test,
2493 );
2494 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2495
2496 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2497 struct AltTestKey(u64);
2498
2499 #[derive(Debug, Encodable, Decodable)]
2500 struct AltDbPrefixTestPrefix;
2501
2502 impl_db_record!(
2503 key = AltTestKey,
2504 value = TestVal,
2505 db_prefix = TestDbKeyPrefix::AltTest,
2506 );
2507 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2508
2509 #[derive(Debug, Encodable, Decodable)]
2510 struct PercentTestKey(u64);
2511
2512 #[derive(Debug, Encodable, Decodable)]
2513 struct PercentPrefixTestPrefix;
2514
2515 impl_db_record!(
2516 key = PercentTestKey,
2517 value = TestVal,
2518 db_prefix = TestDbKeyPrefix::PercentTestKey,
2519 );
2520
2521 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2522 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2523 pub(super) struct TestVal(pub u64);
2524
2525 const TEST_MODULE_PREFIX: u16 = 1;
2526 const ALT_MODULE_PREFIX: u16 = 2;
2527
2528 pub async fn verify_insert_elements(db: Database) {
2529 let mut dbtx = db.begin_transaction().await;
2530 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2531 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2532 dbtx.commit_tx().await;
2533
2534 let mut dbtx = db.begin_transaction().await;
2536 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2537 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2538 dbtx.commit_tx().await;
2539
2540 let mut dbtx = db.begin_transaction().await;
2542 assert_eq!(
2543 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2544 Some(TestVal(2))
2545 );
2546 assert_eq!(
2547 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2548 Some(TestVal(3))
2549 );
2550 dbtx.commit_tx().await;
2551
2552 let mut dbtx = db.begin_transaction().await;
2553 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2554 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2555 dbtx.commit_tx().await;
2556 }
2557
2558 pub async fn verify_remove_nonexisting(db: Database) {
2559 let mut dbtx = db.begin_transaction().await;
2560 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2561 let removed = dbtx.remove_entry(&TestKey(1)).await;
2562 assert!(removed.is_none());
2563
2564 dbtx.commit_tx().await;
2566 }
2567
2568 pub async fn verify_remove_existing(db: Database) {
2569 let mut dbtx = db.begin_transaction().await;
2570
2571 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2572
2573 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2574
2575 let removed = dbtx.remove_entry(&TestKey(1)).await;
2576 assert_eq!(removed, Some(TestVal(2)));
2577 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2578
2579 dbtx.commit_tx().await;
2581 }
2582
2583 pub async fn verify_read_own_writes(db: Database) {
2584 let mut dbtx = db.begin_transaction().await;
2585
2586 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2587
2588 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2589
2590 dbtx.commit_tx().await;
2592 }
2593
2594 pub async fn verify_prevent_dirty_reads(db: Database) {
2595 let mut dbtx = db.begin_transaction().await;
2596
2597 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2598
2599 let mut dbtx2 = db.begin_transaction().await;
2601 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2602
2603 dbtx.commit_tx().await;
2605 }
2606
2607 pub async fn verify_find_by_range(db: Database) {
2608 let mut dbtx = db.begin_transaction().await;
2609 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2610 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2611 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2612
2613 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2614 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2615
2616 {
2617 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2618 module_dbtx
2619 .insert_entry(&TestKey(300), &TestVal(3000))
2620 .await;
2621 }
2622
2623 dbtx.commit_tx().await;
2624
2625 let mut dbtx = db.begin_transaction_nc().await;
2627
2628 let returned_keys = dbtx
2629 .find_by_range(TestKey(55)..TestKey(56))
2630 .await
2631 .collect::<Vec<_>>()
2632 .await;
2633
2634 let expected = vec![(TestKey(55), TestVal(9999))];
2635
2636 assert_eq!(returned_keys, expected);
2637
2638 let returned_keys = dbtx
2639 .find_by_range(TestKey(54)..TestKey(56))
2640 .await
2641 .collect::<Vec<_>>()
2642 .await;
2643
2644 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2645 assert_eq!(returned_keys, expected);
2646
2647 let returned_keys = dbtx
2648 .find_by_range(TestKey(54)..TestKey(57))
2649 .await
2650 .collect::<Vec<_>>()
2651 .await;
2652
2653 let expected = vec![
2654 (TestKey(54), TestVal(8888)),
2655 (TestKey(55), TestVal(9999)),
2656 (TestKey(56), TestVal(7777)),
2657 ];
2658 assert_eq!(returned_keys, expected);
2659
2660 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2661 let test_range = module_dbtx
2662 .find_by_range(TestKey(300)..TestKey(301))
2663 .await
2664 .collect::<Vec<_>>()
2665 .await;
2666 assert!(test_range.len() == 1);
2667 }
2668
2669 pub async fn verify_find_by_prefix(db: Database) {
2670 let mut dbtx = db.begin_transaction().await;
2671 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2672 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2673
2674 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2675 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2676 dbtx.commit_tx().await;
2677
2678 let mut dbtx = db.begin_transaction().await;
2680
2681 let returned_keys = dbtx
2682 .find_by_prefix(&DbPrefixTestPrefix)
2683 .await
2684 .collect::<Vec<_>>()
2685 .await;
2686
2687 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2688 assert_eq!(returned_keys, expected);
2689
2690 let reversed = dbtx
2691 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2692 .await
2693 .collect::<Vec<_>>()
2694 .await;
2695 let mut reversed_expected = expected;
2696 reversed_expected.reverse();
2697 assert_eq!(reversed, reversed_expected);
2698
2699 let returned_keys = dbtx
2700 .find_by_prefix(&AltDbPrefixTestPrefix)
2701 .await
2702 .collect::<Vec<_>>()
2703 .await;
2704
2705 let expected = vec![
2706 (AltTestKey(54), TestVal(6666)),
2707 (AltTestKey(55), TestVal(7777)),
2708 ];
2709 assert_eq!(returned_keys, expected);
2710
2711 let reversed = dbtx
2712 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2713 .await
2714 .collect::<Vec<_>>()
2715 .await;
2716 let mut reversed_expected = expected;
2717 reversed_expected.reverse();
2718 assert_eq!(reversed, reversed_expected);
2719 }
2720
2721 pub async fn verify_commit(db: Database) {
2722 let mut dbtx = db.begin_transaction().await;
2723
2724 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2725 dbtx.commit_tx().await;
2726
2727 let mut dbtx2 = db.begin_transaction().await;
2729 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2730 }
2731
2732 pub async fn verify_rollback_to_savepoint(db: Database) {
2733 let mut dbtx_rollback = db.begin_transaction().await;
2734
2735 dbtx_rollback
2736 .insert_entry(&TestKey(20), &TestVal(2000))
2737 .await;
2738
2739 dbtx_rollback
2740 .set_tx_savepoint()
2741 .await
2742 .expect("Error setting transaction savepoint");
2743
2744 dbtx_rollback
2745 .insert_entry(&TestKey(21), &TestVal(2001))
2746 .await;
2747
2748 assert_eq!(
2749 dbtx_rollback.get_value(&TestKey(20)).await,
2750 Some(TestVal(2000))
2751 );
2752 assert_eq!(
2753 dbtx_rollback.get_value(&TestKey(21)).await,
2754 Some(TestVal(2001))
2755 );
2756
2757 dbtx_rollback
2758 .rollback_tx_to_savepoint()
2759 .await
2760 .expect("Error setting transaction savepoint");
2761
2762 assert_eq!(
2763 dbtx_rollback.get_value(&TestKey(20)).await,
2764 Some(TestVal(2000))
2765 );
2766
2767 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2768
2769 dbtx_rollback.commit_tx().await;
2771 }
2772
2773 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2774 let mut dbtx = db.begin_transaction().await;
2775 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2776
2777 let mut dbtx2 = db.begin_transaction().await;
2778
2779 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2780
2781 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2782
2783 dbtx2.commit_tx().await;
2784
2785 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2788
2789 let expected_keys = 0;
2790 let returned_keys = dbtx
2791 .find_by_prefix(&DbPrefixTestPrefix)
2792 .await
2793 .fold(0, |returned_keys, (key, value)| async move {
2794 if key == TestKey(100) {
2795 assert!(value.eq(&TestVal(101)));
2796 }
2797 returned_keys + 1
2798 })
2799 .await;
2800
2801 assert_eq!(returned_keys, expected_keys);
2802 }
2803
2804 pub async fn verify_snapshot_isolation(db: Database) {
2805 async fn random_yield() {
2806 let times = if rand::thread_rng().gen_bool(0.5) {
2807 0
2808 } else {
2809 10
2810 };
2811 for _ in 0..times {
2812 tokio::task::yield_now().await;
2813 }
2814 }
2815
2816 for i in 0..1000 {
2818 let base_key = i * 2;
2819 let tx_accepted_key = base_key;
2820 let spent_input_key = base_key + 1;
2821
2822 join!(
2823 async {
2824 random_yield().await;
2825 let mut dbtx = db.begin_transaction().await;
2826
2827 random_yield().await;
2828 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2829 random_yield().await;
2830 let s = match i % 5 {
2833 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2834 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2835 2 => {
2836 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2837 .await
2838 }
2839 3 => {
2840 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2841 .await
2842 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2843 .map(|(_k, v)| v)
2844 .next()
2845 .await
2846 }
2847 4 => {
2848 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2849 .await
2850 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2851 .map(|(_k, v)| v)
2852 .next()
2853 .await
2854 }
2855 _ => {
2856 panic!("woot?");
2857 }
2858 };
2859
2860 match (a, s) {
2861 (None, None) | (Some(_), Some(_)) => {}
2862 (None, Some(_)) => panic!("none some?! {i}"),
2863 (Some(_), None) => panic!("some none?! {i}"),
2864 }
2865 },
2866 async {
2867 random_yield().await;
2868
2869 let mut dbtx = db.begin_transaction().await;
2870 random_yield().await;
2871 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2872
2873 random_yield().await;
2874 assert_eq!(
2875 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2876 .await,
2877 None
2878 );
2879
2880 random_yield().await;
2881 assert_eq!(
2882 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2883 .await,
2884 None
2885 );
2886 random_yield().await;
2887 dbtx.commit_tx().await;
2888 }
2889 );
2890 }
2891 }
2892
2893 pub async fn verify_phantom_entry(db: Database) {
2894 let mut dbtx = db.begin_transaction().await;
2895
2896 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2897
2898 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2899
2900 dbtx.commit_tx().await;
2901
2902 let mut dbtx = db.begin_transaction().await;
2903 let expected_keys = 2;
2904 let returned_keys = dbtx
2905 .find_by_prefix(&DbPrefixTestPrefix)
2906 .await
2907 .fold(0, |returned_keys, (key, value)| async move {
2908 match key {
2909 TestKey(100) => {
2910 assert!(value.eq(&TestVal(101)));
2911 }
2912 TestKey(101) => {
2913 assert!(value.eq(&TestVal(102)));
2914 }
2915 _ => {}
2916 };
2917 returned_keys + 1
2918 })
2919 .await;
2920
2921 assert_eq!(returned_keys, expected_keys);
2922
2923 let mut dbtx2 = db.begin_transaction().await;
2924
2925 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2926
2927 dbtx2.commit_tx().await;
2928
2929 let returned_keys = dbtx
2930 .find_by_prefix(&DbPrefixTestPrefix)
2931 .await
2932 .fold(0, |returned_keys, (key, value)| async move {
2933 match key {
2934 TestKey(100) => {
2935 assert!(value.eq(&TestVal(101)));
2936 }
2937 TestKey(101) => {
2938 assert!(value.eq(&TestVal(102)));
2939 }
2940 _ => {}
2941 };
2942 returned_keys + 1
2943 })
2944 .await;
2945
2946 assert_eq!(returned_keys, expected_keys);
2947 }
2948
2949 pub async fn expect_write_conflict(db: Database) {
2950 let mut dbtx = db.begin_transaction().await;
2951 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2952 dbtx.commit_tx().await;
2953
2954 let mut dbtx2 = db.begin_transaction().await;
2955 let mut dbtx3 = db.begin_transaction().await;
2956
2957 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2958
2959 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
2963
2964 dbtx2.commit_tx().await;
2965 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
2966 }
2967
2968 pub async fn verify_string_prefix(db: Database) {
2969 let mut dbtx = db.begin_transaction().await;
2970 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
2971
2972 assert_eq!(
2973 dbtx.get_value(&PercentTestKey(100)).await,
2974 Some(TestVal(101))
2975 );
2976
2977 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2978
2979 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2980
2981 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2982
2983 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
2986
2987 let expected_keys = 4;
2988 let returned_keys = dbtx
2989 .find_by_prefix(&PercentPrefixTestPrefix)
2990 .await
2991 .fold(0, |returned_keys, (key, value)| async move {
2992 if matches!(key, PercentTestKey(101)) {
2993 assert!(value.eq(&TestVal(100)));
2994 }
2995 returned_keys + 1
2996 })
2997 .await;
2998
2999 assert_eq!(returned_keys, expected_keys);
3000 }
3001
3002 pub async fn verify_remove_by_prefix(db: Database) {
3003 let mut dbtx = db.begin_transaction().await;
3004
3005 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3006
3007 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3008
3009 dbtx.commit_tx().await;
3010
3011 let mut remove_dbtx = db.begin_transaction().await;
3012 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3013 remove_dbtx.commit_tx().await;
3014
3015 let mut dbtx = db.begin_transaction().await;
3016 let expected_keys = 0;
3017 let returned_keys = dbtx
3018 .find_by_prefix(&DbPrefixTestPrefix)
3019 .await
3020 .fold(0, |returned_keys, (key, value)| async move {
3021 match key {
3022 TestKey(100) => {
3023 assert!(value.eq(&TestVal(101)));
3024 }
3025 TestKey(101) => {
3026 assert!(value.eq(&TestVal(102)));
3027 }
3028 _ => {}
3029 };
3030 returned_keys + 1
3031 })
3032 .await;
3033
3034 assert_eq!(returned_keys, expected_keys);
3035 }
3036
3037 pub async fn verify_module_db(db: Database, module_db: Database) {
3038 let mut dbtx = db.begin_transaction().await;
3039
3040 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3041
3042 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3043
3044 dbtx.commit_tx().await;
3045
3046 let mut module_dbtx = module_db.begin_transaction().await;
3048 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3049
3050 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3051
3052 let mut dbtx = db.begin_transaction().await;
3054 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3055
3056 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3057
3058 let mut module_dbtx = module_db.begin_transaction().await;
3059
3060 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3061
3062 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3063
3064 module_dbtx.commit_tx().await;
3065
3066 let expected_keys = 2;
3067 let mut dbtx = db.begin_transaction().await;
3068 let returned_keys = dbtx
3069 .find_by_prefix(&DbPrefixTestPrefix)
3070 .await
3071 .fold(0, |returned_keys, (key, value)| async move {
3072 match key {
3073 TestKey(100) => {
3074 assert!(value.eq(&TestVal(101)));
3075 }
3076 TestKey(101) => {
3077 assert!(value.eq(&TestVal(102)));
3078 }
3079 _ => {}
3080 };
3081 returned_keys + 1
3082 })
3083 .await;
3084
3085 assert_eq!(returned_keys, expected_keys);
3086
3087 let removed = dbtx.remove_entry(&TestKey(100)).await;
3088 assert_eq!(removed, Some(TestVal(101)));
3089 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3090
3091 let mut module_dbtx = module_db.begin_transaction().await;
3092 assert_eq!(
3093 module_dbtx.get_value(&TestKey(100)).await,
3094 Some(TestVal(103))
3095 );
3096 }
3097
3098 pub async fn verify_module_prefix(db: Database) {
3099 let mut test_dbtx = db.begin_transaction().await;
3100 {
3101 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3102
3103 test_module_dbtx
3104 .insert_entry(&TestKey(100), &TestVal(101))
3105 .await;
3106
3107 test_module_dbtx
3108 .insert_entry(&TestKey(101), &TestVal(102))
3109 .await;
3110 }
3111
3112 test_dbtx.commit_tx().await;
3113
3114 let mut alt_dbtx = db.begin_transaction().await;
3115 {
3116 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3117
3118 alt_module_dbtx
3119 .insert_entry(&TestKey(100), &TestVal(103))
3120 .await;
3121
3122 alt_module_dbtx
3123 .insert_entry(&TestKey(101), &TestVal(104))
3124 .await;
3125 }
3126
3127 alt_dbtx.commit_tx().await;
3128
3129 let mut test_dbtx = db.begin_transaction().await;
3131 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3132 assert_eq!(
3133 test_module_dbtx.get_value(&TestKey(100)).await,
3134 Some(TestVal(101))
3135 );
3136
3137 assert_eq!(
3138 test_module_dbtx.get_value(&TestKey(101)).await,
3139 Some(TestVal(102))
3140 );
3141
3142 let expected_keys = 2;
3143 let returned_keys = test_module_dbtx
3144 .find_by_prefix(&DbPrefixTestPrefix)
3145 .await
3146 .fold(0, |returned_keys, (key, value)| async move {
3147 match key {
3148 TestKey(100) => {
3149 assert!(value.eq(&TestVal(101)));
3150 }
3151 TestKey(101) => {
3152 assert!(value.eq(&TestVal(102)));
3153 }
3154 _ => {}
3155 };
3156 returned_keys + 1
3157 })
3158 .await;
3159
3160 assert_eq!(returned_keys, expected_keys);
3161
3162 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3163 assert_eq!(removed, Some(TestVal(101)));
3164 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3165
3166 let mut test_dbtx = db.begin_transaction().await;
3169 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3170
3171 test_dbtx.commit_tx().await;
3172 }
3173
3174 #[cfg(test)]
3175 #[tokio::test]
3176 pub async fn verify_test_migration() {
3177 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3179 let expected_test_keys_size: usize = 100;
3180 let mut dbtx = db.begin_transaction().await;
3181 for i in 0..expected_test_keys_size {
3182 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3183 .await;
3184 }
3185
3186 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3188 .await;
3189 dbtx.commit_tx().await;
3190
3191 let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
3192
3193 migrations.insert(DatabaseVersion(0), |ctx| {
3194 migrate_test_db_version_0(ctx).boxed()
3195 });
3196
3197 apply_migrations(&db, "TestModule".to_string(), migrations, None, None)
3198 .await
3199 .expect("Error applying migrations for TestModule");
3200
3201 let mut dbtx = db.begin_transaction().await;
3203
3204 assert!(dbtx
3207 .get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3208 .await
3209 .is_some());
3210
3211 let test_keys = dbtx
3213 .find_by_prefix(&DbPrefixTestPrefix)
3214 .await
3215 .collect::<Vec<_>>()
3216 .await;
3217 let test_keys_size = test_keys.len();
3218 assert_eq!(test_keys_size, expected_test_keys_size);
3219 for (key, val) in test_keys {
3220 assert_eq!(key.0, val.0 + 1);
3221 }
3222 }
3223
3224 #[allow(dead_code)]
3225 async fn migrate_test_db_version_0(mut ctx: MigrationContext<'_>) -> Result<(), anyhow::Error> {
3226 let mut dbtx = ctx.dbtx();
3227 let example_keys_v0 = dbtx
3228 .find_by_prefix(&DbPrefixTestPrefixV0)
3229 .await
3230 .collect::<Vec<_>>()
3231 .await;
3232 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3233 for (key, val) in example_keys_v0 {
3234 let key_v2 = TestKey(key.1);
3235 dbtx.insert_new_entry(&key_v2, &val).await;
3236 }
3237 Ok(())
3238 }
3239
3240 #[cfg(test)]
3241 #[tokio::test]
3242 async fn test_autocommit() {
3243 use std::marker::PhantomData;
3244 use std::ops::Range;
3245 use std::path::Path;
3246
3247 use anyhow::anyhow;
3248 use async_trait::async_trait;
3249
3250 use crate::db::{
3251 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3252 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3253 IRawDatabaseTransaction,
3254 };
3255 use crate::ModuleDecoderRegistry;
3256
3257 #[derive(Debug)]
3258 struct FakeDatabase;
3259
3260 #[async_trait]
3261 impl IRawDatabase for FakeDatabase {
3262 type Transaction<'a> = FakeTransaction<'a>;
3263 async fn begin_transaction(&self) -> FakeTransaction {
3264 FakeTransaction(PhantomData)
3265 }
3266
3267 fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3268 Ok(())
3269 }
3270 }
3271
3272 #[derive(Debug)]
3273 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3274
3275 #[async_trait]
3276 impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3277 async fn raw_insert_bytes(
3278 &mut self,
3279 _key: &[u8],
3280 _value: &[u8],
3281 ) -> anyhow::Result<Option<Vec<u8>>> {
3282 unimplemented!()
3283 }
3284
3285 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3286 unimplemented!()
3287 }
3288
3289 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3290 unimplemented!()
3291 }
3292
3293 async fn raw_find_by_range(
3294 &mut self,
3295 _key_range: Range<&[u8]>,
3296 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3297 unimplemented!()
3298 }
3299
3300 async fn raw_find_by_prefix(
3301 &mut self,
3302 _key_prefix: &[u8],
3303 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3304 unimplemented!()
3305 }
3306
3307 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3308 unimplemented!()
3309 }
3310
3311 async fn raw_find_by_prefix_sorted_descending(
3312 &mut self,
3313 _key_prefix: &[u8],
3314 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3315 unimplemented!()
3316 }
3317 }
3318
3319 #[async_trait]
3320 impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3321 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3322 unimplemented!()
3323 }
3324
3325 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3326 unimplemented!()
3327 }
3328 }
3329
3330 #[async_trait]
3331 impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3332 async fn commit_tx(self) -> anyhow::Result<()> {
3333 Err(anyhow!("Can't commit!"))
3334 }
3335 }
3336
3337 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3338 let err = db
3339 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3340 .await
3341 .unwrap_err();
3342
3343 match err {
3344 AutocommitError::CommitFailed {
3345 attempts: failed_attempts,
3346 ..
3347 } => {
3348 assert_eq!(failed_attempts, 5);
3349 }
3350 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3351 }
3352 }
3353}
3354
3355pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3356 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3357 decoders: ModuleDecoderRegistry,
3358 key_prefix: &KP,
3359) -> impl Stream<
3360 Item = (
3361 KP::Record,
3362 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3363 ),
3364> + 'r
3365where
3366 'inner: 'r,
3367 KP: DatabaseLookup,
3368 KP::Record: DatabaseKey,
3369{
3370 debug!("find by prefix sorted descending");
3371 let prefix_bytes = key_prefix.to_bytes();
3372 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3373 .await
3374 .expect("Error doing prefix search in database")
3375 .map(move |(key_bytes, value_bytes)| {
3376 let key = decode_key_expect(&key_bytes, &decoders);
3377 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3378 (key, value)
3379 })
3380}
3381
3382#[cfg(test)]
3383mod tests {
3384 use tokio::sync::oneshot;
3385
3386 use super::mem_impl::MemDatabase;
3387 use super::*;
3388 use crate::runtime::spawn;
3389
3390 async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> {
3391 let db = db.clone();
3392 let (tx, rx) = oneshot::channel::<()>();
3393 let join_handle = spawn("wait key exists", async move {
3394 let sub = db.wait_key_exists(&key);
3395 tx.send(()).unwrap();
3396 sub.await
3397 });
3398 rx.await.unwrap();
3399 join_handle
3400 }
3401
3402 #[tokio::test]
3403 async fn test_wait_key_before_transaction() {
3404 let key = TestKey(1);
3405 let val = TestVal(2);
3406 let db = MemDatabase::new().into_database();
3407
3408 let key_task = waiter(&db, TestKey(1)).await;
3409
3410 let mut tx = db.begin_transaction().await;
3411 tx.insert_new_entry(&key, &val).await;
3412 tx.commit_tx().await;
3413
3414 assert_eq!(
3415 future_returns_shortly(async { key_task.await.unwrap() }).await,
3416 Some(TestVal(2)),
3417 "should notify"
3418 );
3419 }
3420
3421 #[tokio::test]
3422 async fn test_wait_key_before_insert() {
3423 let key = TestKey(1);
3424 let val = TestVal(2);
3425 let db = MemDatabase::new().into_database();
3426
3427 let mut tx = db.begin_transaction().await;
3428 let key_task = waiter(&db, TestKey(1)).await;
3429 tx.insert_new_entry(&key, &val).await;
3430 tx.commit_tx().await;
3431
3432 assert_eq!(
3433 future_returns_shortly(async { key_task.await.unwrap() }).await,
3434 Some(TestVal(2)),
3435 "should notify"
3436 );
3437 }
3438
3439 #[tokio::test]
3440 async fn test_wait_key_after_insert() {
3441 let key = TestKey(1);
3442 let val = TestVal(2);
3443 let db = MemDatabase::new().into_database();
3444
3445 let mut tx = db.begin_transaction().await;
3446 tx.insert_new_entry(&key, &val).await;
3447
3448 let key_task = waiter(&db, TestKey(1)).await;
3449
3450 tx.commit_tx().await;
3451
3452 assert_eq!(
3453 future_returns_shortly(async { key_task.await.unwrap() }).await,
3454 Some(TestVal(2)),
3455 "should notify"
3456 );
3457 }
3458
3459 #[tokio::test]
3460 async fn test_wait_key_after_commit() {
3461 let key = TestKey(1);
3462 let val = TestVal(2);
3463 let db = MemDatabase::new().into_database();
3464
3465 let mut tx = db.begin_transaction().await;
3466 tx.insert_new_entry(&key, &val).await;
3467 tx.commit_tx().await;
3468
3469 let key_task = waiter(&db, TestKey(1)).await;
3470 assert_eq!(
3471 future_returns_shortly(async { key_task.await.unwrap() }).await,
3472 Some(TestVal(2)),
3473 "should notify"
3474 );
3475 }
3476
3477 #[tokio::test]
3478 async fn test_wait_key_isolated_db() {
3479 let module_instance_id = 10;
3480 let key = TestKey(1);
3481 let val = TestVal(2);
3482 let db = MemDatabase::new().into_database();
3483 let db = db.with_prefix_module_id(module_instance_id).0;
3484
3485 let key_task = waiter(&db, TestKey(1)).await;
3486
3487 let mut tx = db.begin_transaction().await;
3488 tx.insert_new_entry(&key, &val).await;
3489 tx.commit_tx().await;
3490
3491 assert_eq!(
3492 future_returns_shortly(async { key_task.await.unwrap() }).await,
3493 Some(TestVal(2)),
3494 "should notify"
3495 );
3496 }
3497
3498 #[tokio::test]
3499 async fn test_wait_key_isolated_tx() {
3500 let module_instance_id = 10;
3501 let key = TestKey(1);
3502 let val = TestVal(2);
3503 let db = MemDatabase::new().into_database();
3504
3505 let key_task = waiter(&db.with_prefix_module_id(module_instance_id).0, TestKey(1)).await;
3506
3507 let mut tx = db.begin_transaction().await;
3508 let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id).0;
3509 tx_mod.insert_new_entry(&key, &val).await;
3510 drop(tx_mod);
3511 tx.commit_tx().await;
3512
3513 assert_eq!(
3514 future_returns_shortly(async { key_task.await.unwrap() }).await,
3515 Some(TestVal(2)),
3516 "should notify"
3517 );
3518 }
3519
3520 #[tokio::test]
3521 async fn test_wait_key_no_transaction() {
3522 let db = MemDatabase::new().into_database();
3523
3524 let key_task = waiter(&db, TestKey(1)).await;
3525 assert_eq!(
3526 future_returns_shortly(async { key_task.await.unwrap() }).await,
3527 None,
3528 "should not notify"
3529 );
3530 }
3531
3532 #[tokio::test]
3533 async fn test_prefix_global_dbtx() {
3534 let module_instance_id = 10;
3535 let db = MemDatabase::new().into_database();
3536
3537 {
3538 let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3540
3541 let mut tx = db.begin_transaction().await;
3542 let mut tx = tx.global_dbtx(access_token);
3543 tx.insert_new_entry(&TestKey(1), &TestVal(1)).await;
3544 tx.commit_tx().await;
3545 }
3546
3547 assert_eq!(
3548 db.begin_transaction_nc().await.get_value(&TestKey(1)).await,
3549 Some(TestVal(1))
3550 );
3551
3552 {
3553 let (db, access_token) = db.with_prefix_module_id(module_instance_id);
3555
3556 let db = db.with_prefix(vec![3, 4]);
3557
3558 let mut tx = db.begin_transaction().await;
3559 let mut tx = tx.global_dbtx(access_token);
3560 tx.insert_new_entry(&TestKey(2), &TestVal(2)).await;
3561 tx.commit_tx().await;
3562 }
3563
3564 assert_eq!(
3565 db.begin_transaction_nc().await.get_value(&TestKey(2)).await,
3566 Some(TestVal(2))
3567 );
3568 }
3569
3570 #[tokio::test]
3571 #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3572 async fn test_prefix_global_dbtx_panics_on_global_db() {
3573 let db = MemDatabase::new().into_database();
3574
3575 let mut tx = db.begin_transaction().await;
3576 let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3577 }
3578
3579 #[tokio::test]
3580 #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3581 async fn test_prefix_global_dbtx_panics_on_non_module_prefix() {
3582 let db = MemDatabase::new().into_database();
3583
3584 let prefix = vec![3, 4];
3585 let db = db.with_prefix(prefix.clone());
3586
3587 let mut tx = db.begin_transaction().await;
3588 let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&prefix));
3589 }
3590
3591 #[tokio::test]
3592 #[should_panic(expected = "Illegal to call global_dbtx on BaseDatabaseTransaction")]
3593 async fn test_prefix_global_dbtx_panics_on_wrong_access_token() {
3594 let db = MemDatabase::new().into_database();
3595
3596 let prefix = vec![3, 4];
3597 let db = db.with_prefix(prefix.clone());
3598
3599 let mut tx = db.begin_transaction().await;
3600 let _tx = tx.global_dbtx(GlobalDBTxAccessToken::from_prefix(&[1]));
3601 }
3602}