1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4
5pub mod envs;
6
7use std::fmt;
8use std::ops::Range;
9use std::path::Path;
10use std::str::FromStr;
11
12use anyhow::{bail, Context, Result};
13use async_trait::async_trait;
14use fedimint_core::db::{
15 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
16 PrefixStream,
17};
18use futures::stream;
19pub use rocksdb;
20use rocksdb::{
21 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
22};
23use tracing::debug;
24
25use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
26
27fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
30where
31 I: Iterator + Send + 'i,
32 I::Item: Send,
33{
34 stream::unfold(iter, |mut iter| async {
35 fedimint_core::runtime::block_in_place(|| {
36 let item = iter.next();
37 item.map(|item| (item, iter))
38 })
39 })
40}
41
42#[derive(Debug)]
43pub struct RocksDb(rocksdb::OptimisticTransactionDB);
44
45pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
46
47impl RocksDb {
48 pub fn open(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDb> {
49 let mut opts = get_default_options()?;
50 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
53 let db: rocksdb::OptimisticTransactionDB =
54 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, &db_path)?;
55 Ok(RocksDb(db))
56 }
57
58 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
59 &self.0
60 }
61}
62
63fn is_power_of_two(num: usize) -> bool {
64 num.count_ones() == 1
65}
66
67impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.write_str("RocksDbTransaction")
70 }
71}
72
73impl<'a> fmt::Debug for RocksDbTransaction<'a> {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.write_str("RocksDbTransaction")
76 }
77}
78
79#[test]
80fn is_power_of_two_sanity() {
81 assert!(!is_power_of_two(0));
82 assert!(is_power_of_two(1));
83 assert!(is_power_of_two(2));
84 assert!(!is_power_of_two(3));
85 assert!(is_power_of_two(4));
86 assert!(!is_power_of_two(5));
87 assert!(is_power_of_two(2 << 10));
88 assert!(!is_power_of_two((2 << 10) + 1));
89}
90
91fn get_default_options() -> anyhow::Result<rocksdb::Options> {
92 let mut opts = rocksdb::Options::default();
93 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
94 debug!(var, "Using custom write buffer size");
95 let size: usize = FromStr::from_str(&var)
96 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
97 if !is_power_of_two(size) {
98 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
99 }
100 opts.set_write_buffer_size(size);
101 }
102 opts.create_if_missing(true);
103 Ok(opts)
104}
105
106#[derive(Debug)]
107pub struct RocksDbReadOnly(rocksdb::DB);
108
109pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
110
111impl RocksDbReadOnly {
112 pub fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
113 let opts = get_default_options()?;
114 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
115 Ok(RocksDbReadOnly(db))
116 }
117}
118
119impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
120 fn from(db: OptimisticTransactionDB) -> Self {
121 RocksDb(db)
122 }
123}
124
125impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
126 fn from(db: RocksDb) -> Self {
127 db.0
128 }
129}
130
131fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
137 let mut next_prefix = prefix.to_vec();
138 let mut is_last_prefix = true;
139 for i in (0..next_prefix.len()).rev() {
140 next_prefix[i] = next_prefix[i].wrapping_add(1);
141 if next_prefix[i] > 0 {
142 is_last_prefix = false;
143 break;
144 }
145 }
146 if is_last_prefix {
147 None
150 } else {
151 Some(next_prefix)
152 }
153}
154
155#[async_trait]
156impl IRawDatabase for RocksDb {
157 type Transaction<'a> = RocksDbTransaction<'a>;
158 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
159 let mut optimistic_options = OptimisticTransactionOptions::default();
160 optimistic_options.set_snapshot(true);
161
162 let mut write_options = WriteOptions::default();
163 write_options.set_sync(true);
165
166 let mut rocksdb_tx =
167 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
168 rocksdb_tx
169 .set_tx_savepoint()
170 .await
171 .expect("setting tx savepoint failed");
172
173 rocksdb_tx
174 }
175
176 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
177 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
178 checkpoint.create_checkpoint(backup_path)?;
179 Ok(())
180 }
181}
182
183#[async_trait]
184impl IRawDatabase for RocksDbReadOnly {
185 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
186 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
187 RocksDbReadOnlyTransaction(&self.0)
188 }
189
190 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
191 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
192 checkpoint.create_checkpoint(backup_path)?;
193 Ok(())
194 }
195}
196
197#[async_trait]
198impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
199 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
200 fedimint_core::runtime::block_in_place(|| {
201 let val = self.0.snapshot().get(key).unwrap();
202 self.0.put(key, value)?;
203 Ok(val)
204 })
205 }
206
207 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
208 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
209 }
210
211 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
212 fedimint_core::runtime::block_in_place(|| {
213 let val = self.0.snapshot().get(key).unwrap();
214 self.0.delete(key)?;
215 Ok(val)
216 })
217 }
218
219 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
220 Ok(fedimint_core::runtime::block_in_place(|| {
221 let prefix = key_prefix.to_vec();
222 let mut options = rocksdb::ReadOptions::default();
223 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
224 let iter = self.0.snapshot().iterator_opt(
225 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
226 options,
227 );
228 let rocksdb_iter = iter.map_while(move |res| {
229 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
230 key_bytes
231 .starts_with(&prefix)
232 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
233 });
234 Box::pin(convert_to_async_stream(rocksdb_iter))
235 }))
236 }
237
238 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
239 Ok(fedimint_core::runtime::block_in_place(|| {
240 let range = Range {
241 start: range.start.to_vec(),
242 end: range.end.to_vec(),
243 };
244 let mut options = rocksdb::ReadOptions::default();
245 options.set_iterate_range(range.clone());
246 let iter = self.0.snapshot().iterator_opt(
247 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
248 options,
249 );
250 let rocksdb_iter = iter.map_while(move |res| {
251 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
252 (key_bytes.as_ref() < range.end.as_slice())
253 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
254 });
255 Box::pin(convert_to_async_stream(rocksdb_iter))
256 }))
257 }
258
259 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
260 fedimint_core::runtime::block_in_place(|| {
261 let mut options = rocksdb::ReadOptions::default();
263 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
264 let iter = self
265 .0
266 .snapshot()
267 .iterator_opt(
268 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
269 options,
270 )
271 .map_while(|res| {
272 res.map(|(key_bytes, _)| {
273 key_bytes
274 .starts_with(key_prefix)
275 .then_some(key_bytes.to_vec())
276 })
277 .transpose()
278 });
279
280 for item in iter {
281 let key = item?;
282 self.0.delete(key)?;
283 }
284
285 Ok(())
286 })
287 }
288
289 async fn raw_find_by_prefix_sorted_descending(
290 &mut self,
291 key_prefix: &[u8],
292 ) -> Result<PrefixStream<'_>> {
293 let prefix = key_prefix.to_vec();
294 let next_prefix = next_prefix(&prefix);
295 let iterator_mode = if let Some(next_prefix) = &next_prefix {
296 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
297 } else {
298 rocksdb::IteratorMode::End
299 };
300 Ok(fedimint_core::runtime::block_in_place(|| {
301 let mut options = rocksdb::ReadOptions::default();
302 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
303 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
304 let rocksdb_iter = iter.map_while(move |res| {
305 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
306 key_bytes
307 .starts_with(&prefix)
308 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
309 });
310 Box::pin(convert_to_async_stream(rocksdb_iter))
311 }))
312 }
313}
314
315#[async_trait]
316impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
317 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
318 Ok(fedimint_core::runtime::block_in_place(|| {
319 self.0.rollback_to_savepoint()
320 })?)
321 }
322
323 async fn set_tx_savepoint(&mut self) -> Result<()> {
324 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
325
326 Ok(())
327 }
328}
329
330#[async_trait]
331impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
332 async fn commit_tx(self) -> Result<()> {
333 fedimint_core::runtime::block_in_place(|| {
334 self.0.commit()?;
335 Ok(())
336 })
337 }
338}
339
340#[async_trait]
341impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
342 async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
343 panic!("Cannot insert into a read only transaction");
344 }
345
346 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
347 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
348 }
349
350 async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
351 panic!("Cannot remove from a read only transaction");
352 }
353
354 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
355 Ok(fedimint_core::runtime::block_in_place(|| {
356 let range = Range {
357 start: range.start.to_vec(),
358 end: range.end.to_vec(),
359 };
360 let mut options = rocksdb::ReadOptions::default();
361 options.set_iterate_range(range.clone());
362 let iter = self.0.snapshot().iterator_opt(
363 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
364 options,
365 );
366 let rocksdb_iter = iter.map_while(move |res| {
367 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
368 (key_bytes.as_ref() < range.end.as_slice())
369 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
370 });
371 Box::pin(convert_to_async_stream(rocksdb_iter))
372 }))
373 }
374
375 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
376 Ok(fedimint_core::runtime::block_in_place(|| {
377 let prefix = key_prefix.to_vec();
378 let mut options = rocksdb::ReadOptions::default();
379 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
380 let iter = self.0.snapshot().iterator_opt(
381 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
382 options,
383 );
384 let rocksdb_iter = iter.map_while(move |res| {
385 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
386 key_bytes
387 .starts_with(&prefix)
388 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
389 });
390 Box::pin(convert_to_async_stream(rocksdb_iter))
391 }))
392 }
393
394 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
395 panic!("Cannot remove from a read only transaction");
396 }
397
398 async fn raw_find_by_prefix_sorted_descending(
399 &mut self,
400 key_prefix: &[u8],
401 ) -> Result<PrefixStream<'_>> {
402 let prefix = key_prefix.to_vec();
403 let next_prefix = next_prefix(&prefix);
404 let iterator_mode = if let Some(next_prefix) = &next_prefix {
405 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
406 } else {
407 rocksdb::IteratorMode::End
408 };
409 Ok(fedimint_core::runtime::block_in_place(|| {
410 let mut options = rocksdb::ReadOptions::default();
411 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
412 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
413 let rocksdb_iter = iter.map_while(move |res| {
414 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
415 key_bytes
416 .starts_with(&prefix)
417 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
418 });
419 Box::pin(stream::iter(rocksdb_iter))
420 }))
421 }
422}
423
424#[async_trait]
425impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
426 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
427 panic!("Cannot rollback a read only transaction");
428 }
429
430 async fn set_tx_savepoint(&mut self) -> Result<()> {
431 panic!("Cannot set a savepoint in a read only transaction");
432 }
433}
434
435#[async_trait]
436impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
437 async fn commit_tx(self) -> Result<()> {
438 panic!("Cannot commit a read only transaction");
439 }
440}
441
442#[cfg(test)]
443mod fedimint_rocksdb_tests {
444 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
445 use fedimint_core::encoding::{Decodable, Encodable};
446 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
447 use fedimint_core::{impl_db_lookup, impl_db_record};
448 use futures::StreamExt;
449
450 use super::*;
451
452 fn open_temp_db(temp_path: &str) -> Database {
453 let path = tempfile::Builder::new()
454 .prefix(temp_path)
455 .tempdir()
456 .unwrap();
457
458 Database::new(
459 RocksDb::open(path).unwrap(),
460 ModuleDecoderRegistry::default(),
461 )
462 }
463
464 #[tokio::test(flavor = "multi_thread")]
465 async fn test_dbtx_insert_elements() {
466 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
467 .await;
468 }
469
470 #[tokio::test(flavor = "multi_thread")]
471 async fn test_dbtx_remove_nonexisting() {
472 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
473 "fcb-rocksdb-test-remove-nonexisting",
474 ))
475 .await;
476 }
477
478 #[tokio::test(flavor = "multi_thread")]
479 async fn test_dbtx_remove_existing() {
480 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
481 .await;
482 }
483
484 #[tokio::test(flavor = "multi_thread")]
485 async fn test_dbtx_read_own_writes() {
486 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
487 .await;
488 }
489
490 #[tokio::test(flavor = "multi_thread")]
491 async fn test_dbtx_prevent_dirty_reads() {
492 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
493 "fcb-rocksdb-test-prevent-dirty-reads",
494 ))
495 .await;
496 }
497
498 #[tokio::test(flavor = "multi_thread")]
499 async fn test_dbtx_find_by_range() {
500 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
501 .await;
502 }
503
504 #[tokio::test(flavor = "multi_thread")]
505 async fn test_dbtx_find_by_prefix() {
506 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
507 .await;
508 }
509
510 #[tokio::test(flavor = "multi_thread")]
511 async fn test_dbtx_commit() {
512 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
513 }
514
515 #[tokio::test(flavor = "multi_thread")]
516 async fn test_dbtx_prevent_nonrepeatable_reads() {
517 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
518 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
519 ))
520 .await;
521 }
522
523 #[tokio::test(flavor = "multi_thread")]
524 async fn test_dbtx_snapshot_isolation() {
525 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
526 "fcb-rocksdb-test-snapshot-isolation",
527 ))
528 .await;
529 }
530
531 #[tokio::test(flavor = "multi_thread")]
532 async fn test_dbtx_rollback_to_savepoint() {
533 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
534 "fcb-rocksdb-test-rollback-to-savepoint",
535 ))
536 .await;
537 }
538
539 #[tokio::test(flavor = "multi_thread")]
540 async fn test_dbtx_phantom_entry() {
541 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
542 .await;
543 }
544
545 #[tokio::test(flavor = "multi_thread")]
546 async fn test_dbtx_write_conflict() {
547 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
548 .await;
549 }
550
551 #[tokio::test(flavor = "multi_thread")]
552 async fn test_dbtx_remove_by_prefix() {
553 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
554 "fcb-rocksdb-test-remove-by-prefix",
555 ))
556 .await;
557 }
558
559 #[tokio::test(flavor = "multi_thread")]
560 async fn test_module_dbtx() {
561 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
562 .await;
563 }
564
565 #[tokio::test(flavor = "multi_thread")]
566 async fn test_module_db() {
567 let module_instance_id = 1;
568 let path = tempfile::Builder::new()
569 .prefix("fcb-rocksdb-test-module-db-prefix")
570 .tempdir()
571 .unwrap();
572
573 let module_db = Database::new(
574 RocksDb::open(path).unwrap(),
575 ModuleDecoderRegistry::default(),
576 );
577
578 fedimint_core::db::verify_module_db(
579 open_temp_db("fcb-rocksdb-test-module-db"),
580 module_db.with_prefix_module_id(module_instance_id).0,
581 )
582 .await;
583 }
584
585 #[test]
586 fn test_next_prefix() {
587 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
590 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
591 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
592 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
593 assert!(next_prefix(&[255, 255, 255]).is_none());
595 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
597 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
598 assert!(next_prefix(&[255]).is_none()); }
600
601 #[repr(u8)]
602 #[derive(Clone)]
603 pub enum TestDbKeyPrefix {
604 Test = 254,
605 MaxTest = 255,
606 }
607
608 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
609 pub(super) struct TestKey(pub Vec<u8>);
610
611 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
612 pub(super) struct TestVal(pub Vec<u8>);
613
614 #[derive(Debug, Encodable, Decodable)]
615 struct DbPrefixTestPrefix;
616
617 impl_db_record!(
618 key = TestKey,
619 value = TestVal,
620 db_prefix = TestDbKeyPrefix::Test,
621 notify_on_modify = true,
622 );
623 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
624
625 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
626 pub(super) struct TestKey2(pub Vec<u8>);
627
628 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
629 pub(super) struct TestVal2(pub Vec<u8>);
630
631 #[derive(Debug, Encodable, Decodable)]
632 struct DbPrefixTestPrefixMax;
633
634 impl_db_record!(
635 key = TestKey2,
636 value = TestVal2,
637 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
639 );
640 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
641
642 #[tokio::test(flavor = "multi_thread")]
643 async fn test_retrieve_descending_order() {
644 let path = tempfile::Builder::new()
645 .prefix("fcb-rocksdb-test-descending-order")
646 .tempdir()
647 .unwrap();
648 {
649 let db = Database::new(
650 RocksDb::open(&path).unwrap(),
651 ModuleDecoderRegistry::default(),
652 );
653 let mut dbtx = db.begin_transaction().await;
654 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
655 .await;
656 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
657 .await;
658 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
659 .await;
660 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
661 .await;
662 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
663 .await;
664 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
665 .await;
666 let query = dbtx
667 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
668 .await
669 .collect::<Vec<_>>()
670 .await;
671 assert_eq!(
672 query,
673 vec![
674 (TestKey(vec![255]), TestVal(vec![2])),
675 (TestKey(vec![254]), TestVal(vec![1])),
676 (TestKey(vec![0]), TestVal(vec![3]))
677 ]
678 );
679 let query = dbtx
680 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
681 .await
682 .collect::<Vec<_>>()
683 .await;
684 assert_eq!(
685 query,
686 vec![
687 (TestKey2(vec![255]), TestVal2(vec![2])),
688 (TestKey2(vec![254]), TestVal2(vec![1])),
689 (TestKey2(vec![0]), TestVal2(vec![3]))
690 ]
691 );
692 dbtx.commit_tx().await;
693 }
694 let db_readonly = RocksDbReadOnly::open_read_only(path).unwrap();
696 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
697 let mut dbtx = db_readonly.begin_transaction_nc().await;
698 let query = dbtx
699 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
700 .await
701 .collect::<Vec<_>>()
702 .await;
703 assert_eq!(
704 query,
705 vec![
706 (TestKey(vec![255]), TestVal(vec![2])),
707 (TestKey(vec![254]), TestVal(vec![1])),
708 (TestKey(vec![0]), TestVal(vec![3]))
709 ]
710 );
711 let query = dbtx
712 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
713 .await
714 .collect::<Vec<_>>()
715 .await;
716 assert_eq!(
717 query,
718 vec![
719 (TestKey2(vec![255]), TestVal2(vec![2])),
720 (TestKey2(vec![254]), TestVal2(vec![1])),
721 (TestKey2(vec![0]), TestVal2(vec![3]))
722 ]
723 );
724 }
725}