1#[allow(unused_imports)] use super::api::Transaction;
3use super::Key;
4use super::KeyEncode;
5use super::Val;
6use super::Version;
7use crate::cf;
8use crate::dbs::node::Timestamp;
9use crate::doc::CursorValue;
10use crate::err::Error;
11use crate::idg::u32::U32;
12use crate::key::debug::Sprintable;
13use crate::kvs::batch::Batch;
14use crate::kvs::clock::SizedClock;
15#[cfg(any(
16 feature = "kv-tikv",
17 feature = "kv-fdb",
18 feature = "kv-indxdb",
19 feature = "kv-surrealcs",
20))]
21use crate::kvs::savepoint::SavePointImpl;
22use crate::kvs::stash::Stash;
23use crate::kvs::KeyDecode as _;
24use crate::sql;
25use crate::sql::thing::Thing;
26use crate::vs::VersionStamp;
27use sql::statements::DefineTableStatement;
28use std::fmt;
29use std::fmt::Debug;
30use std::ops::Range;
31use std::sync::Arc;
32
33const TARGET: &str = "surrealdb::core::kvs::tr";
34
35#[derive(Debug, Default)]
37pub enum Check {
38 #[default]
39 None,
40 Warn,
41 Error,
42}
43
44#[derive(Copy, Clone)]
46pub enum TransactionType {
47 Read,
48 Write,
49}
50
51impl From<bool> for TransactionType {
52 fn from(value: bool) -> Self {
53 match value {
54 true => TransactionType::Write,
55 false => TransactionType::Read,
56 }
57 }
58}
59
60#[derive(Copy, Clone)]
62pub enum LockType {
63 Pessimistic,
64 Optimistic,
65}
66
67impl From<bool> for LockType {
68 fn from(value: bool) -> Self {
69 match value {
70 true => LockType::Pessimistic,
71 false => LockType::Optimistic,
72 }
73 }
74}
75
76#[allow(dead_code)]
78#[non_exhaustive]
79pub struct Transactor {
80 pub(super) inner: Inner,
81 pub(super) stash: Stash,
82 pub(super) cf: cf::Writer,
83 pub(super) clock: Arc<SizedClock>,
84}
85
86#[allow(clippy::large_enum_variant)]
87pub(super) enum Inner {
88 #[cfg(feature = "kv-mem")]
89 Mem(super::mem::Transaction),
90 #[cfg(feature = "kv-rocksdb")]
91 RocksDB(super::rocksdb::Transaction),
92 #[cfg(feature = "kv-indxdb")]
93 IndxDB(super::indxdb::Transaction),
94 #[cfg(feature = "kv-tikv")]
95 TiKV(super::tikv::Transaction),
96 #[cfg(feature = "kv-fdb")]
97 FoundationDB(super::fdb::Transaction),
98 #[cfg(feature = "kv-surrealkv")]
99 SurrealKV(super::surrealkv::Transaction),
100 #[cfg(feature = "kv-surrealcs")]
101 SurrealCS(super::surrealcs::Transaction),
102}
103
104impl fmt::Display for Transactor {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 #![allow(unused_variables)]
107 match &self.inner {
108 #[cfg(feature = "kv-mem")]
109 Inner::Mem(_) => write!(f, "memory"),
110 #[cfg(feature = "kv-rocksdb")]
111 Inner::RocksDB(_) => write!(f, "rocksdb"),
112 #[cfg(feature = "kv-indxdb")]
113 Inner::IndxDB(_) => write!(f, "indxdb"),
114 #[cfg(feature = "kv-tikv")]
115 Inner::TiKV(_) => write!(f, "tikv"),
116 #[cfg(feature = "kv-fdb")]
117 Inner::FoundationDB(_) => write!(f, "fdb"),
118 #[cfg(feature = "kv-surrealkv")]
119 Inner::SurrealKV(_) => write!(f, "surrealkv"),
120 #[cfg(feature = "kv-surrealcs")]
121 Inner::SurrealCS(_) => write!(f, "surrealcs"),
122 #[allow(unreachable_patterns)]
123 _ => unreachable!(),
124 }
125 }
126}
127
128macro_rules! expand_inner {
129 ( $v:expr, $arm:pat_param => $b:block ) => {
130 match $v {
131 #[cfg(feature = "kv-mem")]
132 Inner::Mem($arm) => $b,
133 #[cfg(feature = "kv-rocksdb")]
134 Inner::RocksDB($arm) => $b,
135 #[cfg(feature = "kv-indxdb")]
136 Inner::IndxDB($arm) => $b,
137 #[cfg(feature = "kv-tikv")]
138 Inner::TiKV($arm) => $b,
139 #[cfg(feature = "kv-fdb")]
140 Inner::FoundationDB($arm) => $b,
141 #[cfg(feature = "kv-surrealkv")]
142 Inner::SurrealKV($arm) => $b,
143 #[cfg(feature = "kv-surrealcs")]
144 Inner::SurrealCS($arm) => $b,
145 #[allow(unreachable_patterns)]
146 _ => unreachable!(),
147 }
148 };
149}
150
151impl Transactor {
152 #![cfg_attr(
154 not(any(
155 feature = "kv-mem",
156 feature = "kv-rocksdb",
157 feature = "kv-indxdb",
158 feature = "kv-tikv",
159 feature = "kv-fdb",
160 feature = "kv-surrealkv",
161 )),
162 allow(unused_variables)
163 )]
164 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
177 pub(crate) fn check_level(&mut self, check: Check) {
178 expand_inner!(&mut self.inner, v => { v.check_level(check) })
179 }
180
181 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
188 pub async fn closed(&self) -> bool {
189 trace!(target: TARGET, "Closed");
190 expand_inner!(&self.inner, v => { v.closed() })
191 }
192
193 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
197 pub async fn cancel(&mut self) -> Result<(), Error> {
198 trace!(target: TARGET, "Cancel");
199 expand_inner!(&mut self.inner, v => { v.cancel().await })
200 }
201
202 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
206 pub async fn commit(&mut self) -> Result<(), Error> {
207 trace!(target: TARGET, "Commit");
208 expand_inner!(&mut self.inner, v => { v.commit().await })
209 }
210
211 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
213 pub async fn exists<K>(&mut self, key: K, version: Option<u64>) -> Result<bool, Error>
214 where
215 K: KeyEncode + Debug,
216 {
217 let key = key.encode_owned()?;
218 trace!(target: TARGET, key = key.sprint(), version = version, "Exists");
219 expand_inner!(&mut self.inner, v => { v.exists(key, version).await })
220 }
221
222 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
224 pub async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
225 where
226 K: KeyEncode + Debug,
227 {
228 let key = key.encode_owned()?;
229 trace!(target: TARGET, key = key.sprint(), version = version, "Get");
230 expand_inner!(&mut self.inner, v => { v.get(key, version).await })
231 }
232
233 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
235 pub async fn getm<K>(&mut self, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
236 where
237 K: KeyEncode + Debug,
238 {
239 let mut keys_encoded = Vec::new();
240 for k in keys {
241 keys_encoded.push(k.encode_owned()?);
242 }
243 trace!(target: TARGET, keys = keys_encoded.sprint(), "GetM");
244 expand_inner!(&mut self.inner, v => { v.getm(keys_encoded).await })
245 }
246
247 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
251 pub async fn getr<K>(
252 &mut self,
253 rng: Range<K>,
254 version: Option<u64>,
255 ) -> Result<Vec<(Key, Val)>, Error>
256 where
257 K: KeyEncode + Debug,
258 {
259 let beg: Key = rng.start.encode_owned()?;
260 let end: Key = rng.end.encode_owned()?;
261 let rng = beg.as_slice()..end.as_slice();
262 trace!(target: TARGET, rng = rng.sprint(), version = version, "GetR");
263 expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await })
264 }
265
266 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
270 pub async fn getp<K>(&mut self, key: K) -> Result<Vec<(Key, Val)>, Error>
271 where
272 K: KeyEncode + Debug,
273 {
274 let key = key.encode_owned()?;
275 trace!(target: TARGET, key = key.sprint(), "GetP");
276 expand_inner!(&mut self.inner, v => { v.getp(key).await })
277 }
278
279 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
281 pub async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
282 where
283 K: KeyEncode + Debug,
284 V: Into<Val> + Debug,
285 {
286 let key = key.encode_owned()?;
287 trace!(target: TARGET, key = key.sprint(), version = version, "Set");
288 expand_inner!(&mut self.inner, v => { v.set(key, val, version).await })
289 }
290
291 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
293 pub async fn replace<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
294 where
295 K: KeyEncode + Debug,
296 V: Into<Val> + Debug,
297 {
298 let key = key.encode_owned()?;
299 trace!(target: TARGET, key = key.sprint(), "Replace");
300 expand_inner!(&mut self.inner, v => { v.replace(key, val).await })
301 }
302
303 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
305 pub async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
306 where
307 K: KeyEncode + Debug,
308 V: Into<Val> + Debug,
309 {
310 let key = key.encode_owned()?;
311 trace!(target: TARGET, key = key.sprint(), version = version, "Put");
312 expand_inner!(&mut self.inner, v => { v.put(key, val, version).await })
313 }
314
315 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
317 pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
318 where
319 K: KeyEncode + Debug,
320 V: Into<Val> + Debug,
321 {
322 let key = key.encode_owned()?;
323 trace!(target: TARGET, key = key.sprint(), "PutC");
324 expand_inner!(&mut self.inner, v => { v.putc(key, val, chk).await })
325 }
326
327 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
329 pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
330 where
331 K: KeyEncode + Debug,
332 {
333 let key = key.encode_owned()?;
334 trace!(target: TARGET, key = key.sprint(), "Del");
335 expand_inner!(&mut self.inner, v => { v.del(key).await })
336 }
337
338 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
340 pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
341 where
342 K: KeyEncode + Debug,
343 V: Into<Val> + Debug,
344 {
345 let key = key.encode_owned()?;
346 trace!(target: TARGET, key = key.sprint(), "DelC");
347 expand_inner!(&mut self.inner, v => { v.delc(key, chk).await })
348 }
349
350 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
354 pub async fn delr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
355 where
356 K: KeyEncode + Debug,
357 {
358 let beg: Key = rng.start.encode_owned()?;
359 let end: Key = rng.end.encode_owned()?;
360 let rng = beg.as_slice()..end.as_slice();
361 trace!(target: TARGET, rng = rng.sprint(), "DelR");
362 expand_inner!(&mut self.inner, v => { v.delr(beg..end).await })
363 }
364
365 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
369 pub async fn delp<K>(&mut self, key: K) -> Result<(), Error>
370 where
371 K: KeyEncode + Debug,
372 {
373 let key = key.encode_owned()?;
374 trace!(target: TARGET, key = key.sprint(), "DelP");
375 expand_inner!(&mut self.inner, v => { v.delp(key).await })
376 }
377
378 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
380 pub async fn clr<K>(&mut self, key: K) -> Result<(), Error>
381 where
382 K: KeyEncode + Debug,
383 {
384 let key = key.encode_owned()?;
385 trace!(target: TARGET, key = key.sprint(), "Clr");
386 expand_inner!(&mut self.inner, v => { v.clr(key).await })
387 }
388
389 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
391 pub async fn clrc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
392 where
393 K: KeyEncode + Debug,
394 V: Into<Val> + Debug,
395 {
396 let key = key.encode_owned()?;
397 trace!(target: TARGET, key = key.sprint(), "ClrC");
398 expand_inner!(&mut self.inner, v => { v.clrc(key, chk).await })
399 }
400
401 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
405 pub async fn clrr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
406 where
407 K: KeyEncode + Debug,
408 {
409 let beg: Key = rng.start.encode_owned()?;
410 let end: Key = rng.end.encode_owned()?;
411 let rng = beg.as_slice()..end.as_slice();
412 trace!(target: TARGET, rng = rng.sprint(), "ClrR");
413 expand_inner!(&mut self.inner, v => { v.clrr(beg..end).await })
414 }
415
416 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
420 pub async fn clrp<K>(&mut self, key: K) -> Result<(), Error>
421 where
422 K: KeyEncode + Debug,
423 {
424 let key: Key = key.encode_owned()?;
425 trace!(target: TARGET, key = key.sprint(), "ClrP");
426 expand_inner!(&mut self.inner, v => { v.clrp(key).await })
427 }
428
429 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
433 pub async fn keys<K>(
434 &mut self,
435 rng: Range<K>,
436 limit: u32,
437 version: Option<u64>,
438 ) -> Result<Vec<Key>, Error>
439 where
440 K: KeyEncode + Debug,
441 {
442 let beg: Key = rng.start.encode_owned()?;
443 let end: Key = rng.end.encode_owned()?;
444 let rng = beg.as_slice()..end.as_slice();
445 trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Keys");
446 if beg > end {
447 return Ok(vec![]);
448 }
449 expand_inner!(&mut self.inner, v => { v.keys(beg..end, limit, version).await })
450 }
451
452 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
456 pub async fn scan<K>(
457 &mut self,
458 rng: Range<K>,
459 limit: u32,
460 version: Option<u64>,
461 ) -> Result<Vec<(Key, Val)>, Error>
462 where
463 K: KeyEncode + Debug,
464 {
465 let beg: Key = rng.start.encode_owned()?;
466 let end: Key = rng.end.encode_owned()?;
467 let rng = beg.as_slice()..end.as_slice();
468 trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scan");
469 if beg > end {
470 return Ok(vec![]);
471 }
472 expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit, version).await })
473 }
474
475 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
479 pub async fn batch_keys<K>(
480 &mut self,
481 rng: Range<K>,
482 batch: u32,
483 version: Option<u64>,
484 ) -> Result<Batch<Key>, Error>
485 where
486 K: KeyEncode + Debug,
487 {
488 let beg: Key = rng.start.encode_owned()?;
489 let end: Key = rng.end.encode_owned()?;
490 let rng = beg.as_slice()..end.as_slice();
491 trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
492 expand_inner!(&mut self.inner, v => { v.batch_keys(beg..end, batch, version).await })
493 }
494
495 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
499 pub async fn count<K>(&mut self, rng: Range<K>) -> Result<usize, Error>
500 where
501 K: KeyEncode + Debug,
502 {
503 let beg: Key = rng.start.encode_owned()?;
504 let end: Key = rng.end.encode_owned()?;
505 let rng = beg.as_slice()..end.as_slice();
506 trace!(target: TARGET, rng = rng.sprint(), "Count");
507 expand_inner!(&mut self.inner, v => { v.count(beg..end).await })
508 }
509
510 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
514 pub async fn batch_keys_vals<K>(
515 &mut self,
516 rng: Range<K>,
517 batch: u32,
518 version: Option<u64>,
519 ) -> Result<Batch<(Key, Val)>, Error>
520 where
521 K: KeyEncode + Debug,
522 {
523 let beg: Key = rng.start.encode_owned()?;
524 let end: Key = rng.end.encode_owned()?;
525 let rng = beg.as_slice()..end.as_slice();
526 trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
527 expand_inner!(&mut self.inner, v => { v.batch_keys_vals(beg..end, batch, version).await })
528 }
529
530 #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
534 pub async fn batch_keys_vals_versions<K>(
535 &mut self,
536 rng: Range<K>,
537 batch: u32,
538 ) -> Result<Batch<(Key, Val, Version, bool)>, Error>
539 where
540 K: KeyEncode + Debug,
541 {
542 let beg: Key = rng.start.encode_owned()?;
543 let end: Key = rng.end.encode_owned()?;
544 let rng = beg.as_slice()..end.as_slice();
545 trace!(target: TARGET, rng = rng.sprint(), "BatchVersions");
546 expand_inner!(&mut self.inner, v => { v.batch_keys_vals_versions(beg..end, batch).await })
547 }
548
549 pub async fn get_timestamp<K>(&mut self, key: K) -> Result<VersionStamp, Error>
555 where
556 K: KeyEncode + Debug,
557 {
558 let key = key.encode_owned()?;
559 expand_inner!(&mut self.inner, v => { v.get_timestamp(key).await })
560 }
561
562 pub async fn set_versionstamped<K, V>(
564 &mut self,
565 ts_key: K,
566 prefix: K,
567 suffix: K,
568 val: V,
569 ) -> Result<(), Error>
570 where
571 K: KeyEncode + Debug,
572 V: Into<Val> + Debug,
573 {
574 let ts_key = ts_key.encode_owned()?;
575 let prefix = prefix.encode_owned()?;
576 let suffix = suffix.encode_owned()?;
577 expand_inner!(&mut self.inner, v => { v.set_versionstamp(ts_key, prefix, suffix, val).await })
578 }
579
580 pub async fn clock(&self) -> Timestamp {
593 self.clock.now().await
594 }
595
596 #[allow(clippy::too_many_arguments)]
600 pub(crate) fn record_change(
601 &mut self,
602 ns: &str,
603 db: &str,
604 tb: &str,
605 id: &Thing,
606 previous: CursorValue,
607 current: CursorValue,
608 store_difference: bool,
609 ) {
610 self.cf.record_cf_change(ns, db, tb, id.clone(), previous, current, store_difference)
611 }
612
613 pub(crate) fn record_table_change(
615 &mut self,
616 ns: &str,
617 db: &str,
618 tb: &str,
619 dt: &DefineTableStatement,
620 ) {
621 self.cf.define_table(ns, db, tb, dt)
622 }
623
624 pub(crate) async fn get_idg(&mut self, key: &Key) -> Result<U32, Error> {
625 Ok(if let Some(v) = self.stash.get(key) {
626 v
627 } else {
628 let val = self.get(key.clone(), None).await?;
629 if let Some(val) = val {
630 U32::new(key.clone(), Some(val)).await?
631 } else {
632 U32::new(key.clone(), None).await?
633 }
634 })
635 }
636
637 pub(crate) async fn get_next_ns_id(&mut self) -> Result<u32, Error> {
639 let key = crate::key::root::ni::Ni::default().encode_owned()?;
640 let mut seq = self.get_idg(&key).await?;
641 let nid = seq.get_next_id();
642 self.stash.set(key, seq.clone());
643 let (k, v) = seq.finish().unwrap();
644 self.replace(k, v).await?;
645 Ok(nid)
646 }
647
648 pub(crate) async fn get_next_db_id(&mut self, ns: u32) -> Result<u32, Error> {
650 let key = crate::key::namespace::di::new(ns).encode_owned()?;
651 let mut seq = self.get_idg(&key).await?;
652 let nid = seq.get_next_id();
653 self.stash.set(key, seq.clone());
654 let (k, v) = seq.finish().unwrap();
655 self.replace(k, v).await?;
656 Ok(nid)
657 }
658
659 pub(crate) async fn get_next_tb_id(&mut self, ns: u32, db: u32) -> Result<u32, Error> {
661 let key = crate::key::database::ti::new(ns, db).encode_owned()?;
662 let mut seq = self.get_idg(&key).await?;
663 let nid = seq.get_next_id();
664 self.stash.set(key, seq.clone());
665 let (k, v) = seq.finish().unwrap();
666 self.replace(k, v).await?;
667 Ok(nid)
668 }
669
670 #[allow(unused)]
672 pub(crate) async fn remove_ns_id(&mut self, ns: u32) -> Result<(), Error> {
673 let key = crate::key::root::ni::Ni::default().encode_owned()?;
674 let mut seq = self.get_idg(&key).await?;
675 seq.remove_id(ns);
676 self.stash.set(key, seq.clone());
677 let (k, v) = seq.finish().unwrap();
678 self.replace(k, v).await?;
679 Ok(())
680 }
681
682 #[allow(unused)]
684 pub(crate) async fn remove_db_id(&mut self, ns: u32, db: u32) -> Result<(), Error> {
685 let key = crate::key::namespace::di::new(ns).encode_owned()?;
686 let mut seq = self.get_idg(&key).await?;
687 seq.remove_id(db);
688 self.stash.set(key, seq.clone());
689 let (k, v) = seq.finish().unwrap();
690 self.replace(k, v).await?;
691 Ok(())
692 }
693
694 #[allow(unused)]
696 pub(crate) async fn remove_tb_id(&mut self, ns: u32, db: u32, tb: u32) -> Result<(), Error> {
697 let key = crate::key::database::ti::new(ns, db).encode_owned()?;
698 let mut seq = self.get_idg(&key).await?;
699 seq.remove_id(tb);
700 self.stash.set(key, seq.clone());
701 let (k, v) = seq.finish().unwrap();
702 self.replace(k, v).await?;
703 Ok(())
704 }
705
706 pub(crate) async fn complete_changes(&mut self, _lock: bool) -> Result<(), Error> {
723 let changes = self.cf.get()?;
724 for (tskey, prefix, suffix, v) in changes {
725 self.set_versionstamped(tskey, prefix, suffix, v).await?
726 }
727 Ok(())
728 }
729
730 pub(crate) async fn set_timestamp_for_versionstamp(
733 &mut self,
734 ts: u64,
735 ns: &str,
736 db: &str,
737 ) -> Result<VersionStamp, Error> {
738 let key = crate::key::database::vs::new(ns, db);
741 let vst = self.get_timestamp(key).await?;
742 trace!(
743 target: TARGET,
744 "Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}",
745 ts,
746 vst.into_u64_lossy(),
747 ns,
748 db
749 );
750
751 let mut ts_key = crate::key::database::ts::new(ns, db, ts);
754 let begin = ts_key.encode()?;
755 let end = crate::key::database::ts::suffix(ns, db)?;
756 let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, None).await?;
757 let latest_ts_pair = ts_pairs.last();
758 if let Some((k, _)) = latest_ts_pair {
759 trace!(
760 target: TARGET,
761 "There already was a greater committed timestamp {} in ns: {}, db: {} found: {}",
762 ts,
763 ns,
764 db,
765 k.sprint()
766 );
767 let k = crate::key::database::ts::Ts::decode(k)?;
768 let latest_ts = k.ts;
769 if latest_ts >= ts {
770 warn!("ts {ts} is less than the latest ts {latest_ts}");
771 ts_key = crate::key::database::ts::new(ns, db, latest_ts + 1);
772 }
773 }
774 self.replace(ts_key, vst.as_bytes()).await?;
775 Ok(vst)
776 }
777
778 pub(crate) async fn get_versionstamp_from_timestamp(
779 &mut self,
780 ts: u64,
781 ns: &str,
782 db: &str,
783 ) -> Result<Option<VersionStamp>, Error> {
784 let start = crate::key::database::ts::prefix(ns, db)?;
785 let ts_key = crate::key::database::ts::new(ns, db, ts + 1).encode_owned()?;
786 let end = ts_key.encode_owned()?;
787 let ts_pairs = self.getr(start..end, None).await?;
788 let latest_ts_pair = ts_pairs.last();
789 if let Some((_, v)) = latest_ts_pair {
790 return Ok(Some(VersionStamp::from_slice(v)?));
791 }
792 Ok(None)
793 }
794
795 pub(crate) async fn new_save_point(&mut self) {
796 expand_inner!(&mut self.inner, v => { v.new_save_point() })
797 }
798
799 pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
800 expand_inner!(&mut self.inner, v => { v.rollback_to_save_point().await })
801 }
802
803 pub(crate) async fn release_last_save_point(&mut self) -> Result<(), Error> {
804 expand_inner!(&mut self.inner, v => { v.release_last_save_point() })
805 }
806}