fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4
5pub mod envs;
6
7use std::fmt;
8use std::ops::Range;
9use std::path::Path;
10use std::str::FromStr;
11
12use anyhow::{bail, Context, Result};
13use async_trait::async_trait;
14use fedimint_core::db::{
15    IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
16    PrefixStream,
17};
18use futures::stream;
19pub use rocksdb;
20use rocksdb::{
21    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
22};
23use tracing::debug;
24
25use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
26
27// turn an `iter` into a `Stream` where every `next` is ran inside
28// `block_in_place` to offload the blocking calls
29fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
30where
31    I: Iterator + Send + 'i,
32    I::Item: Send,
33{
34    stream::unfold(iter, |mut iter| async {
35        fedimint_core::runtime::block_in_place(|| {
36            let item = iter.next();
37            item.map(|item| (item, iter))
38        })
39    })
40}
41
42#[derive(Debug)]
43pub struct RocksDb(rocksdb::OptimisticTransactionDB);
44
45pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
46
47impl RocksDb {
48    pub fn open(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDb> {
49        let mut opts = get_default_options()?;
50        // Since we turned synchronous writes one we should never encounter a corrupted
51        // WAL and should rather fail in this case
52        opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
53        let db: rocksdb::OptimisticTransactionDB =
54            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, &db_path)?;
55        Ok(RocksDb(db))
56    }
57
58    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
59        &self.0
60    }
61}
62
63fn is_power_of_two(num: usize) -> bool {
64    num.count_ones() == 1
65}
66
67impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.write_str("RocksDbTransaction")
70    }
71}
72
73impl<'a> fmt::Debug for RocksDbTransaction<'a> {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.write_str("RocksDbTransaction")
76    }
77}
78
79#[test]
80fn is_power_of_two_sanity() {
81    assert!(!is_power_of_two(0));
82    assert!(is_power_of_two(1));
83    assert!(is_power_of_two(2));
84    assert!(!is_power_of_two(3));
85    assert!(is_power_of_two(4));
86    assert!(!is_power_of_two(5));
87    assert!(is_power_of_two(2 << 10));
88    assert!(!is_power_of_two((2 << 10) + 1));
89}
90
91fn get_default_options() -> anyhow::Result<rocksdb::Options> {
92    let mut opts = rocksdb::Options::default();
93    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
94        debug!(var, "Using custom write buffer size");
95        let size: usize = FromStr::from_str(&var)
96            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
97        if !is_power_of_two(size) {
98            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
99        }
100        opts.set_write_buffer_size(size);
101    }
102    opts.create_if_missing(true);
103    Ok(opts)
104}
105
106#[derive(Debug)]
107pub struct RocksDbReadOnly(rocksdb::DB);
108
109pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
110
111impl RocksDbReadOnly {
112    pub fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
113        let opts = get_default_options()?;
114        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
115        Ok(RocksDbReadOnly(db))
116    }
117}
118
119impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
120    fn from(db: OptimisticTransactionDB) -> Self {
121        RocksDb(db)
122    }
123}
124
125impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
126    fn from(db: RocksDb) -> Self {
127        db.0
128    }
129}
130
131// When finding by prefix iterating in Reverse order, we need to start from
132// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
133// below.
134// Will return None if there is no next prefix (i.e prefix is already the last
135// possible/max one)
136fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
137    let mut next_prefix = prefix.to_vec();
138    let mut is_last_prefix = true;
139    for i in (0..next_prefix.len()).rev() {
140        next_prefix[i] = next_prefix[i].wrapping_add(1);
141        if next_prefix[i] > 0 {
142            is_last_prefix = false;
143            break;
144        }
145    }
146    if is_last_prefix {
147        // The given prefix is already the last/max prefix, so there is no next prefix,
148        // return None to represent that
149        None
150    } else {
151        Some(next_prefix)
152    }
153}
154
155#[async_trait]
156impl IRawDatabase for RocksDb {
157    type Transaction<'a> = RocksDbTransaction<'a>;
158    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
159        let mut optimistic_options = OptimisticTransactionOptions::default();
160        optimistic_options.set_snapshot(true);
161
162        let mut write_options = WriteOptions::default();
163        // Make sure we never lose data on unclean shutdown
164        write_options.set_sync(true);
165
166        let mut rocksdb_tx =
167            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
168        rocksdb_tx
169            .set_tx_savepoint()
170            .await
171            .expect("setting tx savepoint failed");
172
173        rocksdb_tx
174    }
175
176    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
177        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
178        checkpoint.create_checkpoint(backup_path)?;
179        Ok(())
180    }
181}
182
183#[async_trait]
184impl IRawDatabase for RocksDbReadOnly {
185    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
186    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
187        RocksDbReadOnlyTransaction(&self.0)
188    }
189
190    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
191        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
192        checkpoint.create_checkpoint(backup_path)?;
193        Ok(())
194    }
195}
196
197#[async_trait]
198impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
199    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
200        fedimint_core::runtime::block_in_place(|| {
201            let val = self.0.snapshot().get(key).unwrap();
202            self.0.put(key, value)?;
203            Ok(val)
204        })
205    }
206
207    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
208        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
209    }
210
211    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
212        fedimint_core::runtime::block_in_place(|| {
213            let val = self.0.snapshot().get(key).unwrap();
214            self.0.delete(key)?;
215            Ok(val)
216        })
217    }
218
219    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
220        Ok(fedimint_core::runtime::block_in_place(|| {
221            let prefix = key_prefix.to_vec();
222            let mut options = rocksdb::ReadOptions::default();
223            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
224            let iter = self.0.snapshot().iterator_opt(
225                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
226                options,
227            );
228            let rocksdb_iter = iter.map_while(move |res| {
229                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
230                key_bytes
231                    .starts_with(&prefix)
232                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
233            });
234            Box::pin(convert_to_async_stream(rocksdb_iter))
235        }))
236    }
237
238    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
239        Ok(fedimint_core::runtime::block_in_place(|| {
240            let range = Range {
241                start: range.start.to_vec(),
242                end: range.end.to_vec(),
243            };
244            let mut options = rocksdb::ReadOptions::default();
245            options.set_iterate_range(range.clone());
246            let iter = self.0.snapshot().iterator_opt(
247                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
248                options,
249            );
250            let rocksdb_iter = iter.map_while(move |res| {
251                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
252                (key_bytes.as_ref() < range.end.as_slice())
253                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
254            });
255            Box::pin(convert_to_async_stream(rocksdb_iter))
256        }))
257    }
258
259    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
260        fedimint_core::runtime::block_in_place(|| {
261            // Note: delete_range is not supported in Transactions :/
262            let mut options = rocksdb::ReadOptions::default();
263            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
264            let iter = self
265                .0
266                .snapshot()
267                .iterator_opt(
268                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
269                    options,
270                )
271                .map_while(|res| {
272                    res.map(|(key_bytes, _)| {
273                        key_bytes
274                            .starts_with(key_prefix)
275                            .then_some(key_bytes.to_vec())
276                    })
277                    .transpose()
278                });
279
280            for item in iter {
281                let key = item?;
282                self.0.delete(key)?;
283            }
284
285            Ok(())
286        })
287    }
288
289    async fn raw_find_by_prefix_sorted_descending(
290        &mut self,
291        key_prefix: &[u8],
292    ) -> Result<PrefixStream<'_>> {
293        let prefix = key_prefix.to_vec();
294        let next_prefix = next_prefix(&prefix);
295        let iterator_mode = if let Some(next_prefix) = &next_prefix {
296            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
297        } else {
298            rocksdb::IteratorMode::End
299        };
300        Ok(fedimint_core::runtime::block_in_place(|| {
301            let mut options = rocksdb::ReadOptions::default();
302            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
303            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
304            let rocksdb_iter = iter.map_while(move |res| {
305                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
306                key_bytes
307                    .starts_with(&prefix)
308                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
309            });
310            Box::pin(convert_to_async_stream(rocksdb_iter))
311        }))
312    }
313}
314
315#[async_trait]
316impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
317    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
318        Ok(fedimint_core::runtime::block_in_place(|| {
319            self.0.rollback_to_savepoint()
320        })?)
321    }
322
323    async fn set_tx_savepoint(&mut self) -> Result<()> {
324        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
325
326        Ok(())
327    }
328}
329
330#[async_trait]
331impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
332    async fn commit_tx(self) -> Result<()> {
333        fedimint_core::runtime::block_in_place(|| {
334            self.0.commit()?;
335            Ok(())
336        })
337    }
338}
339
340#[async_trait]
341impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
342    async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
343        panic!("Cannot insert into a read only transaction");
344    }
345
346    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
347        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
348    }
349
350    async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
351        panic!("Cannot remove from a read only transaction");
352    }
353
354    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
355        Ok(fedimint_core::runtime::block_in_place(|| {
356            let range = Range {
357                start: range.start.to_vec(),
358                end: range.end.to_vec(),
359            };
360            let mut options = rocksdb::ReadOptions::default();
361            options.set_iterate_range(range.clone());
362            let iter = self.0.snapshot().iterator_opt(
363                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
364                options,
365            );
366            let rocksdb_iter = iter.map_while(move |res| {
367                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
368                (key_bytes.as_ref() < range.end.as_slice())
369                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
370            });
371            Box::pin(convert_to_async_stream(rocksdb_iter))
372        }))
373    }
374
375    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
376        Ok(fedimint_core::runtime::block_in_place(|| {
377            let prefix = key_prefix.to_vec();
378            let mut options = rocksdb::ReadOptions::default();
379            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
380            let iter = self.0.snapshot().iterator_opt(
381                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
382                options,
383            );
384            let rocksdb_iter = iter.map_while(move |res| {
385                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
386                key_bytes
387                    .starts_with(&prefix)
388                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
389            });
390            Box::pin(convert_to_async_stream(rocksdb_iter))
391        }))
392    }
393
394    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
395        panic!("Cannot remove from a read only transaction");
396    }
397
398    async fn raw_find_by_prefix_sorted_descending(
399        &mut self,
400        key_prefix: &[u8],
401    ) -> Result<PrefixStream<'_>> {
402        let prefix = key_prefix.to_vec();
403        let next_prefix = next_prefix(&prefix);
404        let iterator_mode = if let Some(next_prefix) = &next_prefix {
405            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
406        } else {
407            rocksdb::IteratorMode::End
408        };
409        Ok(fedimint_core::runtime::block_in_place(|| {
410            let mut options = rocksdb::ReadOptions::default();
411            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
412            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
413            let rocksdb_iter = iter.map_while(move |res| {
414                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
415                key_bytes
416                    .starts_with(&prefix)
417                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
418            });
419            Box::pin(stream::iter(rocksdb_iter))
420        }))
421    }
422}
423
424#[async_trait]
425impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
426    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
427        panic!("Cannot rollback a read only transaction");
428    }
429
430    async fn set_tx_savepoint(&mut self) -> Result<()> {
431        panic!("Cannot set a savepoint in a read only transaction");
432    }
433}
434
435#[async_trait]
436impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
437    async fn commit_tx(self) -> Result<()> {
438        panic!("Cannot commit a read only transaction");
439    }
440}
441
442#[cfg(test)]
443mod fedimint_rocksdb_tests {
444    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
445    use fedimint_core::encoding::{Decodable, Encodable};
446    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
447    use fedimint_core::{impl_db_lookup, impl_db_record};
448    use futures::StreamExt;
449
450    use super::*;
451
452    fn open_temp_db(temp_path: &str) -> Database {
453        let path = tempfile::Builder::new()
454            .prefix(temp_path)
455            .tempdir()
456            .unwrap();
457
458        Database::new(
459            RocksDb::open(path).unwrap(),
460            ModuleDecoderRegistry::default(),
461        )
462    }
463
464    #[tokio::test(flavor = "multi_thread")]
465    async fn test_dbtx_insert_elements() {
466        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
467            .await;
468    }
469
470    #[tokio::test(flavor = "multi_thread")]
471    async fn test_dbtx_remove_nonexisting() {
472        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
473            "fcb-rocksdb-test-remove-nonexisting",
474        ))
475        .await;
476    }
477
478    #[tokio::test(flavor = "multi_thread")]
479    async fn test_dbtx_remove_existing() {
480        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
481            .await;
482    }
483
484    #[tokio::test(flavor = "multi_thread")]
485    async fn test_dbtx_read_own_writes() {
486        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
487            .await;
488    }
489
490    #[tokio::test(flavor = "multi_thread")]
491    async fn test_dbtx_prevent_dirty_reads() {
492        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
493            "fcb-rocksdb-test-prevent-dirty-reads",
494        ))
495        .await;
496    }
497
498    #[tokio::test(flavor = "multi_thread")]
499    async fn test_dbtx_find_by_range() {
500        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
501            .await;
502    }
503
504    #[tokio::test(flavor = "multi_thread")]
505    async fn test_dbtx_find_by_prefix() {
506        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
507            .await;
508    }
509
510    #[tokio::test(flavor = "multi_thread")]
511    async fn test_dbtx_commit() {
512        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
513    }
514
515    #[tokio::test(flavor = "multi_thread")]
516    async fn test_dbtx_prevent_nonrepeatable_reads() {
517        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
518            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
519        ))
520        .await;
521    }
522
523    #[tokio::test(flavor = "multi_thread")]
524    async fn test_dbtx_snapshot_isolation() {
525        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
526            "fcb-rocksdb-test-snapshot-isolation",
527        ))
528        .await;
529    }
530
531    #[tokio::test(flavor = "multi_thread")]
532    async fn test_dbtx_rollback_to_savepoint() {
533        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
534            "fcb-rocksdb-test-rollback-to-savepoint",
535        ))
536        .await;
537    }
538
539    #[tokio::test(flavor = "multi_thread")]
540    async fn test_dbtx_phantom_entry() {
541        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
542            .await;
543    }
544
545    #[tokio::test(flavor = "multi_thread")]
546    async fn test_dbtx_write_conflict() {
547        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
548            .await;
549    }
550
551    #[tokio::test(flavor = "multi_thread")]
552    async fn test_dbtx_remove_by_prefix() {
553        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
554            "fcb-rocksdb-test-remove-by-prefix",
555        ))
556        .await;
557    }
558
559    #[tokio::test(flavor = "multi_thread")]
560    async fn test_module_dbtx() {
561        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
562            .await;
563    }
564
565    #[tokio::test(flavor = "multi_thread")]
566    async fn test_module_db() {
567        let module_instance_id = 1;
568        let path = tempfile::Builder::new()
569            .prefix("fcb-rocksdb-test-module-db-prefix")
570            .tempdir()
571            .unwrap();
572
573        let module_db = Database::new(
574            RocksDb::open(path).unwrap(),
575            ModuleDecoderRegistry::default(),
576        );
577
578        fedimint_core::db::verify_module_db(
579            open_temp_db("fcb-rocksdb-test-module-db"),
580            module_db.with_prefix_module_id(module_instance_id).0,
581        )
582        .await;
583    }
584
585    #[test]
586    fn test_next_prefix() {
587        // Note: although we are testing the general case of a vector with N elements,
588        // the prefixes currently use N = 1
589        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
590        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
591        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
592        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
593        // this is a "max" prefix
594        assert!(next_prefix(&[255, 255, 255]).is_none());
595        // these are the common case
596        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
597        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
598        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
599    }
600
601    #[repr(u8)]
602    #[derive(Clone)]
603    pub enum TestDbKeyPrefix {
604        Test = 254,
605        MaxTest = 255,
606    }
607
608    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
609    pub(super) struct TestKey(pub Vec<u8>);
610
611    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
612    pub(super) struct TestVal(pub Vec<u8>);
613
614    #[derive(Debug, Encodable, Decodable)]
615    struct DbPrefixTestPrefix;
616
617    impl_db_record!(
618        key = TestKey,
619        value = TestVal,
620        db_prefix = TestDbKeyPrefix::Test,
621        notify_on_modify = true,
622    );
623    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
624
625    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
626    pub(super) struct TestKey2(pub Vec<u8>);
627
628    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
629    pub(super) struct TestVal2(pub Vec<u8>);
630
631    #[derive(Debug, Encodable, Decodable)]
632    struct DbPrefixTestPrefixMax;
633
634    impl_db_record!(
635        key = TestKey2,
636        value = TestVal2,
637        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
638        notify_on_modify = true,
639    );
640    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
641
642    #[tokio::test(flavor = "multi_thread")]
643    async fn test_retrieve_descending_order() {
644        let path = tempfile::Builder::new()
645            .prefix("fcb-rocksdb-test-descending-order")
646            .tempdir()
647            .unwrap();
648        {
649            let db = Database::new(
650                RocksDb::open(&path).unwrap(),
651                ModuleDecoderRegistry::default(),
652            );
653            let mut dbtx = db.begin_transaction().await;
654            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
655                .await;
656            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
657                .await;
658            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
659                .await;
660            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
661                .await;
662            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
663                .await;
664            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
665                .await;
666            let query = dbtx
667                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
668                .await
669                .collect::<Vec<_>>()
670                .await;
671            assert_eq!(
672                query,
673                vec![
674                    (TestKey(vec![255]), TestVal(vec![2])),
675                    (TestKey(vec![254]), TestVal(vec![1])),
676                    (TestKey(vec![0]), TestVal(vec![3]))
677                ]
678            );
679            let query = dbtx
680                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
681                .await
682                .collect::<Vec<_>>()
683                .await;
684            assert_eq!(
685                query,
686                vec![
687                    (TestKey2(vec![255]), TestVal2(vec![2])),
688                    (TestKey2(vec![254]), TestVal2(vec![1])),
689                    (TestKey2(vec![0]), TestVal2(vec![3]))
690                ]
691            );
692            dbtx.commit_tx().await;
693        }
694        // Test readonly implementation
695        let db_readonly = RocksDbReadOnly::open_read_only(path).unwrap();
696        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
697        let mut dbtx = db_readonly.begin_transaction_nc().await;
698        let query = dbtx
699            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
700            .await
701            .collect::<Vec<_>>()
702            .await;
703        assert_eq!(
704            query,
705            vec![
706                (TestKey(vec![255]), TestVal(vec![2])),
707                (TestKey(vec![254]), TestVal(vec![1])),
708                (TestKey(vec![0]), TestVal(vec![3]))
709            ]
710        );
711        let query = dbtx
712            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
713            .await
714            .collect::<Vec<_>>()
715            .await;
716        assert_eq!(
717            query,
718            vec![
719                (TestKey2(vec![255]), TestVal2(vec![2])),
720                (TestKey2(vec![254]), TestVal2(vec![1])),
721                (TestKey2(vec![0]), TestVal2(vec![3]))
722            ]
723        );
724    }
725}