surrealdb_core/kvs/
tr.rs

1#[allow(unused_imports)] // not used when non of the storage backends are enabled.
2use super::api::Transaction;
3use super::Key;
4use super::KeyEncode;
5use super::Val;
6use super::Version;
7use crate::cf;
8use crate::dbs::node::Timestamp;
9use crate::doc::CursorValue;
10use crate::err::Error;
11use crate::idg::u32::U32;
12use crate::key::debug::Sprintable;
13use crate::kvs::batch::Batch;
14use crate::kvs::clock::SizedClock;
15#[cfg(any(
16	feature = "kv-tikv",
17	feature = "kv-fdb",
18	feature = "kv-indxdb",
19	feature = "kv-surrealcs",
20))]
21use crate::kvs::savepoint::SavePointImpl;
22use crate::kvs::stash::Stash;
23use crate::kvs::KeyDecode as _;
24use crate::sql;
25use crate::sql::thing::Thing;
26use crate::vs::VersionStamp;
27use sql::statements::DefineTableStatement;
28use std::fmt;
29use std::fmt::Debug;
30use std::ops::Range;
31use std::sync::Arc;
32
33const TARGET: &str = "surrealdb::core::kvs::tr";
34
35/// Used to determine the behaviour when a transaction is not closed correctly
36#[derive(Debug, Default)]
37pub enum Check {
38	#[default]
39	None,
40	Warn,
41	Error,
42}
43
44/// Specifies whether the transaction is read-only or writeable.
45#[derive(Copy, Clone)]
46pub enum TransactionType {
47	Read,
48	Write,
49}
50
51impl From<bool> for TransactionType {
52	fn from(value: bool) -> Self {
53		match value {
54			true => TransactionType::Write,
55			false => TransactionType::Read,
56		}
57	}
58}
59
60/// Specifies whether the transaction is optimistic or pessimistic.
61#[derive(Copy, Clone)]
62pub enum LockType {
63	Pessimistic,
64	Optimistic,
65}
66
67impl From<bool> for LockType {
68	fn from(value: bool) -> Self {
69		match value {
70			true => LockType::Pessimistic,
71			false => LockType::Optimistic,
72		}
73	}
74}
75
76/// A set of undoable updates and requests against a dataset.
77#[allow(dead_code)]
78#[non_exhaustive]
79pub struct Transactor {
80	pub(super) inner: Inner,
81	pub(super) stash: Stash,
82	pub(super) cf: cf::Writer,
83	pub(super) clock: Arc<SizedClock>,
84}
85
86#[allow(clippy::large_enum_variant)]
87pub(super) enum Inner {
88	#[cfg(feature = "kv-mem")]
89	Mem(super::mem::Transaction),
90	#[cfg(feature = "kv-rocksdb")]
91	RocksDB(super::rocksdb::Transaction),
92	#[cfg(feature = "kv-indxdb")]
93	IndxDB(super::indxdb::Transaction),
94	#[cfg(feature = "kv-tikv")]
95	TiKV(super::tikv::Transaction),
96	#[cfg(feature = "kv-fdb")]
97	FoundationDB(super::fdb::Transaction),
98	#[cfg(feature = "kv-surrealkv")]
99	SurrealKV(super::surrealkv::Transaction),
100	#[cfg(feature = "kv-surrealcs")]
101	SurrealCS(super::surrealcs::Transaction),
102}
103
104impl fmt::Display for Transactor {
105	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106		#![allow(unused_variables)]
107		match &self.inner {
108			#[cfg(feature = "kv-mem")]
109			Inner::Mem(_) => write!(f, "memory"),
110			#[cfg(feature = "kv-rocksdb")]
111			Inner::RocksDB(_) => write!(f, "rocksdb"),
112			#[cfg(feature = "kv-indxdb")]
113			Inner::IndxDB(_) => write!(f, "indxdb"),
114			#[cfg(feature = "kv-tikv")]
115			Inner::TiKV(_) => write!(f, "tikv"),
116			#[cfg(feature = "kv-fdb")]
117			Inner::FoundationDB(_) => write!(f, "fdb"),
118			#[cfg(feature = "kv-surrealkv")]
119			Inner::SurrealKV(_) => write!(f, "surrealkv"),
120			#[cfg(feature = "kv-surrealcs")]
121			Inner::SurrealCS(_) => write!(f, "surrealcs"),
122			#[allow(unreachable_patterns)]
123			_ => unreachable!(),
124		}
125	}
126}
127
128macro_rules! expand_inner {
129	( $v:expr, $arm:pat_param => $b:block ) => {
130		match $v {
131			#[cfg(feature = "kv-mem")]
132			Inner::Mem($arm) => $b,
133			#[cfg(feature = "kv-rocksdb")]
134			Inner::RocksDB($arm) => $b,
135			#[cfg(feature = "kv-indxdb")]
136			Inner::IndxDB($arm) => $b,
137			#[cfg(feature = "kv-tikv")]
138			Inner::TiKV($arm) => $b,
139			#[cfg(feature = "kv-fdb")]
140			Inner::FoundationDB($arm) => $b,
141			#[cfg(feature = "kv-surrealkv")]
142			Inner::SurrealKV($arm) => $b,
143			#[cfg(feature = "kv-surrealcs")]
144			Inner::SurrealCS($arm) => $b,
145			#[allow(unreachable_patterns)]
146			_ => unreachable!(),
147		}
148	};
149}
150
151impl Transactor {
152	// Allow unused_variables when no storage is enabled as none of the values are used then.
153	#![cfg_attr(
154		not(any(
155			feature = "kv-mem",
156			feature = "kv-rocksdb",
157			feature = "kv-indxdb",
158			feature = "kv-tikv",
159			feature = "kv-fdb",
160			feature = "kv-surrealkv",
161		)),
162		allow(unused_variables)
163	)]
164	// --------------------------------------------------
165	// Integral methods
166	// --------------------------------------------------
167
168	/// Specify how we should handle unclosed transactions.
169	///
170	/// If a transaction is not cancelled or rolled back then
171	/// this can cause issues on some storage engine
172	/// implementations. In tests we can ignore unhandled
173	/// transactions, whilst in development we should panic
174	/// so that any unintended behaviour is detected, and in
175	/// production we should only log a warning.
176	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
177	pub(crate) fn check_level(&mut self, check: Check) {
178		expand_inner!(&mut self.inner, v => { v.check_level(check) })
179	}
180
181	/// Check if transaction is finished.
182	///
183	/// If the transaction has been cancelled or committed,
184	/// then this function will return [`true`], and any further
185	/// calls to functions on this transaction will result
186	/// in a [`Error::TxFinished`] error.
187	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
188	pub async fn closed(&self) -> bool {
189		trace!(target: TARGET, "Closed");
190		expand_inner!(&self.inner, v => { v.closed() })
191	}
192
193	/// Cancel a transaction.
194	///
195	/// This reverses all changes made within the transaction.
196	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
197	pub async fn cancel(&mut self) -> Result<(), Error> {
198		trace!(target: TARGET, "Cancel");
199		expand_inner!(&mut self.inner, v => { v.cancel().await })
200	}
201
202	/// Commit a transaction.
203	///
204	/// This attempts to commit all changes made within the transaction.
205	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
206	pub async fn commit(&mut self) -> Result<(), Error> {
207		trace!(target: TARGET, "Commit");
208		expand_inner!(&mut self.inner, v => { v.commit().await })
209	}
210
211	/// Check if a key exists in the datastore.
212	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
213	pub async fn exists<K>(&mut self, key: K, version: Option<u64>) -> Result<bool, Error>
214	where
215		K: KeyEncode + Debug,
216	{
217		let key = key.encode_owned()?;
218		trace!(target: TARGET, key = key.sprint(), version = version, "Exists");
219		expand_inner!(&mut self.inner, v => { v.exists(key, version).await })
220	}
221
222	/// Fetch a key from the datastore.
223	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
224	pub async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
225	where
226		K: KeyEncode + Debug,
227	{
228		let key = key.encode_owned()?;
229		trace!(target: TARGET, key = key.sprint(), version = version, "Get");
230		expand_inner!(&mut self.inner, v => { v.get(key, version).await })
231	}
232
233	/// Fetch many keys from the datastore.
234	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
235	pub async fn getm<K>(&mut self, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
236	where
237		K: KeyEncode + Debug,
238	{
239		let mut keys_encoded = Vec::new();
240		for k in keys {
241			keys_encoded.push(k.encode_owned()?);
242		}
243		trace!(target: TARGET, keys = keys_encoded.sprint(), "GetM");
244		expand_inner!(&mut self.inner, v => { v.getm(keys_encoded).await })
245	}
246
247	/// Retrieve a specific range of keys from the datastore.
248	///
249	/// This function fetches all matching key-value pairs from the underlying datastore in grouped batches.
250	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
251	pub async fn getr<K>(
252		&mut self,
253		rng: Range<K>,
254		version: Option<u64>,
255	) -> Result<Vec<(Key, Val)>, Error>
256	where
257		K: KeyEncode + Debug,
258	{
259		let beg: Key = rng.start.encode_owned()?;
260		let end: Key = rng.end.encode_owned()?;
261		let rng = beg.as_slice()..end.as_slice();
262		trace!(target: TARGET, rng = rng.sprint(), version = version, "GetR");
263		expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await })
264	}
265
266	/// Retrieve a specific prefixed range of keys from the datastore.
267	///
268	/// This function fetches all matching key-value pairs from the underlying datastore in grouped batches.
269	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
270	pub async fn getp<K>(&mut self, key: K) -> Result<Vec<(Key, Val)>, Error>
271	where
272		K: KeyEncode + Debug,
273	{
274		let key = key.encode_owned()?;
275		trace!(target: TARGET, key = key.sprint(), "GetP");
276		expand_inner!(&mut self.inner, v => { v.getp(key).await })
277	}
278
279	/// Insert or update a key in the datastore.
280	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
281	pub async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
282	where
283		K: KeyEncode + Debug,
284		V: Into<Val> + Debug,
285	{
286		let key = key.encode_owned()?;
287		trace!(target: TARGET, key = key.sprint(), version = version, "Set");
288		expand_inner!(&mut self.inner, v => { v.set(key, val, version).await })
289	}
290
291	/// Insert or replace a key in the datastore.
292	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
293	pub async fn replace<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
294	where
295		K: KeyEncode + Debug,
296		V: Into<Val> + Debug,
297	{
298		let key = key.encode_owned()?;
299		trace!(target: TARGET, key = key.sprint(), "Replace");
300		expand_inner!(&mut self.inner, v => { v.replace(key, val).await })
301	}
302
303	/// Insert a key if it doesn't exist in the datastore.
304	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
305	pub async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
306	where
307		K: KeyEncode + Debug,
308		V: Into<Val> + Debug,
309	{
310		let key = key.encode_owned()?;
311		trace!(target: TARGET, key = key.sprint(), version = version, "Put");
312		expand_inner!(&mut self.inner, v => { v.put(key, val, version).await })
313	}
314
315	/// Update a key in the datastore if the current value matches a condition.
316	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
317	pub async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
318	where
319		K: KeyEncode + Debug,
320		V: Into<Val> + Debug,
321	{
322		let key = key.encode_owned()?;
323		trace!(target: TARGET, key = key.sprint(), "PutC");
324		expand_inner!(&mut self.inner, v => { v.putc(key, val, chk).await })
325	}
326
327	/// Delete a key from the datastore.
328	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
329	pub async fn del<K>(&mut self, key: K) -> Result<(), Error>
330	where
331		K: KeyEncode + Debug,
332	{
333		let key = key.encode_owned()?;
334		trace!(target: TARGET, key = key.sprint(), "Del");
335		expand_inner!(&mut self.inner, v => { v.del(key).await })
336	}
337
338	/// Delete a key from the datastore if the current value matches a condition.
339	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
340	pub async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
341	where
342		K: KeyEncode + Debug,
343		V: Into<Val> + Debug,
344	{
345		let key = key.encode_owned()?;
346		trace!(target: TARGET, key = key.sprint(), "DelC");
347		expand_inner!(&mut self.inner, v => { v.delc(key, chk).await })
348	}
349
350	/// Delete a range of keys from the datastore.
351	///
352	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
353	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
354	pub async fn delr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
355	where
356		K: KeyEncode + Debug,
357	{
358		let beg: Key = rng.start.encode_owned()?;
359		let end: Key = rng.end.encode_owned()?;
360		let rng = beg.as_slice()..end.as_slice();
361		trace!(target: TARGET, rng = rng.sprint(), "DelR");
362		expand_inner!(&mut self.inner, v => { v.delr(beg..end).await })
363	}
364
365	/// Delete a prefixed range of keys from the datastore.
366	///
367	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
368	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
369	pub async fn delp<K>(&mut self, key: K) -> Result<(), Error>
370	where
371		K: KeyEncode + Debug,
372	{
373		let key = key.encode_owned()?;
374		trace!(target: TARGET, key = key.sprint(), "DelP");
375		expand_inner!(&mut self.inner, v => { v.delp(key).await })
376	}
377
378	/// Delete all versions of a key from the datastore.
379	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
380	pub async fn clr<K>(&mut self, key: K) -> Result<(), Error>
381	where
382		K: KeyEncode + Debug,
383	{
384		let key = key.encode_owned()?;
385		trace!(target: TARGET, key = key.sprint(), "Clr");
386		expand_inner!(&mut self.inner, v => { v.clr(key).await })
387	}
388
389	/// Delete all versions of a key from the datastore if the current value matches a condition.
390	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
391	pub async fn clrc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
392	where
393		K: KeyEncode + Debug,
394		V: Into<Val> + Debug,
395	{
396		let key = key.encode_owned()?;
397		trace!(target: TARGET, key = key.sprint(), "ClrC");
398		expand_inner!(&mut self.inner, v => { v.clrc(key, chk).await })
399	}
400
401	/// Delete all versions of a range of keys from the datastore.
402	///
403	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
404	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
405	pub async fn clrr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
406	where
407		K: KeyEncode + Debug,
408	{
409		let beg: Key = rng.start.encode_owned()?;
410		let end: Key = rng.end.encode_owned()?;
411		let rng = beg.as_slice()..end.as_slice();
412		trace!(target: TARGET, rng = rng.sprint(), "ClrR");
413		expand_inner!(&mut self.inner, v => { v.clrr(beg..end).await })
414	}
415
416	/// Delete all versions of a prefixed range of keys from the datastore.
417	///
418	/// This function deletes all matching key-value pairs from the underlying datastore in grouped batches.
419	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
420	pub async fn clrp<K>(&mut self, key: K) -> Result<(), Error>
421	where
422		K: KeyEncode + Debug,
423	{
424		let key: Key = key.encode_owned()?;
425		trace!(target: TARGET, key = key.sprint(), "ClrP");
426		expand_inner!(&mut self.inner, v => { v.clrp(key).await })
427	}
428
429	/// Retrieve a specific range of keys from the datastore.
430	///
431	/// This function fetches the full range of keys without values, in a single request to the underlying datastore.
432	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
433	pub async fn keys<K>(
434		&mut self,
435		rng: Range<K>,
436		limit: u32,
437		version: Option<u64>,
438	) -> Result<Vec<Key>, Error>
439	where
440		K: KeyEncode + Debug,
441	{
442		let beg: Key = rng.start.encode_owned()?;
443		let end: Key = rng.end.encode_owned()?;
444		let rng = beg.as_slice()..end.as_slice();
445		trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Keys");
446		if beg > end {
447			return Ok(vec![]);
448		}
449		expand_inner!(&mut self.inner, v => { v.keys(beg..end, limit, version).await })
450	}
451
452	/// Retrieve a specific range of keys from the datastore.
453	///
454	/// This function fetches the full range of key-value pairs, in a single request to the underlying datastore.
455	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
456	pub async fn scan<K>(
457		&mut self,
458		rng: Range<K>,
459		limit: u32,
460		version: Option<u64>,
461	) -> Result<Vec<(Key, Val)>, Error>
462	where
463		K: KeyEncode + Debug,
464	{
465		let beg: Key = rng.start.encode_owned()?;
466		let end: Key = rng.end.encode_owned()?;
467		let rng = beg.as_slice()..end.as_slice();
468		trace!(target: TARGET, rng = rng.sprint(), limit = limit, version = version, "Scan");
469		if beg > end {
470			return Ok(vec![]);
471		}
472		expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit, version).await })
473	}
474
475	/// Retrieve a batched scan over a specific range of keys in the datastore.
476	///
477	/// This function fetches keys, in batches, with multiple requests to the underlying datastore.
478	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
479	pub async fn batch_keys<K>(
480		&mut self,
481		rng: Range<K>,
482		batch: u32,
483		version: Option<u64>,
484	) -> Result<Batch<Key>, Error>
485	where
486		K: KeyEncode + Debug,
487	{
488		let beg: Key = rng.start.encode_owned()?;
489		let end: Key = rng.end.encode_owned()?;
490		let rng = beg.as_slice()..end.as_slice();
491		trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
492		expand_inner!(&mut self.inner, v => { v.batch_keys(beg..end, batch, version).await })
493	}
494
495	/// Count the total number of keys within a range in the datastore.
496	///
497	/// This function fetches the total count, in batches, with multiple requests to the underlying datastore.
498	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
499	pub async fn count<K>(&mut self, rng: Range<K>) -> Result<usize, Error>
500	where
501		K: KeyEncode + Debug,
502	{
503		let beg: Key = rng.start.encode_owned()?;
504		let end: Key = rng.end.encode_owned()?;
505		let rng = beg.as_slice()..end.as_slice();
506		trace!(target: TARGET, rng = rng.sprint(), "Count");
507		expand_inner!(&mut self.inner, v => { v.count(beg..end).await })
508	}
509
510	/// Retrieve a batched scan over a specific range of keys in the datastore.
511	///
512	/// This function fetches key-value pairs, in batches, with multiple requests to the underlying datastore.
513	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
514	pub async fn batch_keys_vals<K>(
515		&mut self,
516		rng: Range<K>,
517		batch: u32,
518		version: Option<u64>,
519	) -> Result<Batch<(Key, Val)>, Error>
520	where
521		K: KeyEncode + Debug,
522	{
523		let beg: Key = rng.start.encode_owned()?;
524		let end: Key = rng.end.encode_owned()?;
525		let rng = beg.as_slice()..end.as_slice();
526		trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch");
527		expand_inner!(&mut self.inner, v => { v.batch_keys_vals(beg..end, batch, version).await })
528	}
529
530	/// Retrieve a batched scan of all versions over a specific range of keys in the datastore.
531	///
532	/// This function fetches key-value-version pairs, in batches, with multiple requests to the underlying datastore.
533	#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
534	pub async fn batch_keys_vals_versions<K>(
535		&mut self,
536		rng: Range<K>,
537		batch: u32,
538	) -> Result<Batch<(Key, Val, Version, bool)>, Error>
539	where
540		K: KeyEncode + Debug,
541	{
542		let beg: Key = rng.start.encode_owned()?;
543		let end: Key = rng.end.encode_owned()?;
544		let rng = beg.as_slice()..end.as_slice();
545		trace!(target: TARGET, rng = rng.sprint(), "BatchVersions");
546		expand_inner!(&mut self.inner, v => { v.batch_keys_vals_versions(beg..end, batch).await })
547	}
548
549	/// Obtain a new change timestamp for a key
550	/// which is replaced with the current timestamp when the transaction is committed.
551	/// NOTE: This should be called when composing the change feed entries for this transaction,
552	/// which should be done immediately before the transaction commit.
553	/// That is to keep other transactions commit delay(pessimistic) or conflict(optimistic) as less as possible.
554	pub async fn get_timestamp<K>(&mut self, key: K) -> Result<VersionStamp, Error>
555	where
556		K: KeyEncode + Debug,
557	{
558		let key = key.encode_owned()?;
559		expand_inner!(&mut self.inner, v => { v.get_timestamp(key).await })
560	}
561
562	/// Insert or update a key in the datastore.
563	pub async fn set_versionstamped<K, V>(
564		&mut self,
565		ts_key: K,
566		prefix: K,
567		suffix: K,
568		val: V,
569	) -> Result<(), Error>
570	where
571		K: KeyEncode + Debug,
572		V: Into<Val> + Debug,
573	{
574		let ts_key = ts_key.encode_owned()?;
575		let prefix = prefix.encode_owned()?;
576		let suffix = suffix.encode_owned()?;
577		expand_inner!(&mut self.inner, v => { v.set_versionstamp(ts_key, prefix, suffix, val).await })
578	}
579
580	// --------------------------------------------------
581	// Additional methods
582	// --------------------------------------------------
583
584	/// Clock retrieves the current timestamp, without guaranteeing
585	/// monotonicity in all implementations.
586	///
587	/// It is used for unreliable ordering of events as well as
588	/// handling of timeouts. Operations that are not guaranteed to be correct.
589	/// But also allows for lexicographical ordering.
590	///
591	/// Public for tests, but not required for usage from a user perspective.
592	pub async fn clock(&self) -> Timestamp {
593		self.clock.now().await
594	}
595
596	// change will record the change in the changefeed if enabled.
597	// To actually persist the record changes into the underlying kvs,
598	// you must call the `complete_changes` function and then commit the transaction.
599	#[allow(clippy::too_many_arguments)]
600	pub(crate) fn record_change(
601		&mut self,
602		ns: &str,
603		db: &str,
604		tb: &str,
605		id: &Thing,
606		previous: CursorValue,
607		current: CursorValue,
608		store_difference: bool,
609	) {
610		self.cf.record_cf_change(ns, db, tb, id.clone(), previous, current, store_difference)
611	}
612
613	// Records the table (re)definition in the changefeed if enabled.
614	pub(crate) fn record_table_change(
615		&mut self,
616		ns: &str,
617		db: &str,
618		tb: &str,
619		dt: &DefineTableStatement,
620	) {
621		self.cf.define_table(ns, db, tb, dt)
622	}
623
624	pub(crate) async fn get_idg(&mut self, key: &Key) -> Result<U32, Error> {
625		Ok(if let Some(v) = self.stash.get(key) {
626			v
627		} else {
628			let val = self.get(key.clone(), None).await?;
629			if let Some(val) = val {
630				U32::new(key.clone(), Some(val)).await?
631			} else {
632				U32::new(key.clone(), None).await?
633			}
634		})
635	}
636
637	/// Gets the next namespace id
638	pub(crate) async fn get_next_ns_id(&mut self) -> Result<u32, Error> {
639		let key = crate::key::root::ni::Ni::default().encode_owned()?;
640		let mut seq = self.get_idg(&key).await?;
641		let nid = seq.get_next_id();
642		self.stash.set(key, seq.clone());
643		let (k, v) = seq.finish().unwrap();
644		self.replace(k, v).await?;
645		Ok(nid)
646	}
647
648	/// Gets the next database id for the given namespace
649	pub(crate) async fn get_next_db_id(&mut self, ns: u32) -> Result<u32, Error> {
650		let key = crate::key::namespace::di::new(ns).encode_owned()?;
651		let mut seq = self.get_idg(&key).await?;
652		let nid = seq.get_next_id();
653		self.stash.set(key, seq.clone());
654		let (k, v) = seq.finish().unwrap();
655		self.replace(k, v).await?;
656		Ok(nid)
657	}
658
659	/// Gets the next table id for the given namespace and database
660	pub(crate) async fn get_next_tb_id(&mut self, ns: u32, db: u32) -> Result<u32, Error> {
661		let key = crate::key::database::ti::new(ns, db).encode_owned()?;
662		let mut seq = self.get_idg(&key).await?;
663		let nid = seq.get_next_id();
664		self.stash.set(key, seq.clone());
665		let (k, v) = seq.finish().unwrap();
666		self.replace(k, v).await?;
667		Ok(nid)
668	}
669
670	/// Removes the given namespace from the sequence.
671	#[allow(unused)]
672	pub(crate) async fn remove_ns_id(&mut self, ns: u32) -> Result<(), Error> {
673		let key = crate::key::root::ni::Ni::default().encode_owned()?;
674		let mut seq = self.get_idg(&key).await?;
675		seq.remove_id(ns);
676		self.stash.set(key, seq.clone());
677		let (k, v) = seq.finish().unwrap();
678		self.replace(k, v).await?;
679		Ok(())
680	}
681
682	/// Removes the given database from the sequence.
683	#[allow(unused)]
684	pub(crate) async fn remove_db_id(&mut self, ns: u32, db: u32) -> Result<(), Error> {
685		let key = crate::key::namespace::di::new(ns).encode_owned()?;
686		let mut seq = self.get_idg(&key).await?;
687		seq.remove_id(db);
688		self.stash.set(key, seq.clone());
689		let (k, v) = seq.finish().unwrap();
690		self.replace(k, v).await?;
691		Ok(())
692	}
693
694	/// Removes the given table from the sequence.
695	#[allow(unused)]
696	pub(crate) async fn remove_tb_id(&mut self, ns: u32, db: u32, tb: u32) -> Result<(), Error> {
697		let key = crate::key::database::ti::new(ns, db).encode_owned()?;
698		let mut seq = self.get_idg(&key).await?;
699		seq.remove_id(tb);
700		self.stash.set(key, seq.clone());
701		let (k, v) = seq.finish().unwrap();
702		self.replace(k, v).await?;
703		Ok(())
704	}
705
706	// complete_changes will complete the changefeed recording for the given namespace and database.
707	//
708	// Under the hood, this function calls the transaction's `set_versionstamped_key` for each change.
709	// Every change must be recorded by calling this struct's `record_change` function beforehand.
710	// If there were no preceding `record_change` function calls for this transaction, this function will do nothing.
711	//
712	// This function should be called only after all the changes have been made to the transaction.
713	// Otherwise, changes are missed in the change feed.
714	//
715	// This function should be called immediately before calling the commit function to guarantee that
716	// the lock, if needed by lock=true, is held only for the duration of the commit, not the entire transaction.
717	//
718	// This function is here because it needs access to mutably borrow the transaction.
719	//
720	// Lastly, you should set lock=true if you want the changefeed to be correctly ordered for
721	// non-FDB backends.
722	pub(crate) async fn complete_changes(&mut self, _lock: bool) -> Result<(), Error> {
723		let changes = self.cf.get()?;
724		for (tskey, prefix, suffix, v) in changes {
725			self.set_versionstamped(tskey, prefix, suffix, v).await?
726		}
727		Ok(())
728	}
729
730	// set_timestamp_for_versionstamp correlates the given timestamp with the current versionstamp.
731	// This allows get_versionstamp_from_timestamp to obtain the versionstamp from the timestamp later.
732	pub(crate) async fn set_timestamp_for_versionstamp(
733		&mut self,
734		ts: u64,
735		ns: &str,
736		db: &str,
737	) -> Result<VersionStamp, Error> {
738		// This also works as an advisory lock on the ts keys so that there is
739		// on other concurrent transactions that can write to the ts_key or the keys after it.
740		let key = crate::key::database::vs::new(ns, db);
741		let vst = self.get_timestamp(key).await?;
742		trace!(
743			target: TARGET,
744			"Setting timestamp {} for versionstamp {:?} in ns: {}, db: {}",
745			ts,
746			vst.into_u64_lossy(),
747			ns,
748			db
749		);
750
751		// Ensure there are no keys after the ts_key
752		// Otherwise we can go back in time!
753		let mut ts_key = crate::key::database::ts::new(ns, db, ts);
754		let begin = ts_key.encode()?;
755		let end = crate::key::database::ts::suffix(ns, db)?;
756		let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, None).await?;
757		let latest_ts_pair = ts_pairs.last();
758		if let Some((k, _)) = latest_ts_pair {
759			trace!(
760				target: TARGET,
761				"There already was a greater committed timestamp {} in ns: {}, db: {} found: {}",
762				ts,
763				ns,
764				db,
765				k.sprint()
766			);
767			let k = crate::key::database::ts::Ts::decode(k)?;
768			let latest_ts = k.ts;
769			if latest_ts >= ts {
770				warn!("ts {ts} is less than the latest ts {latest_ts}");
771				ts_key = crate::key::database::ts::new(ns, db, latest_ts + 1);
772			}
773		}
774		self.replace(ts_key, vst.as_bytes()).await?;
775		Ok(vst)
776	}
777
778	pub(crate) async fn get_versionstamp_from_timestamp(
779		&mut self,
780		ts: u64,
781		ns: &str,
782		db: &str,
783	) -> Result<Option<VersionStamp>, Error> {
784		let start = crate::key::database::ts::prefix(ns, db)?;
785		let ts_key = crate::key::database::ts::new(ns, db, ts + 1).encode_owned()?;
786		let end = ts_key.encode_owned()?;
787		let ts_pairs = self.getr(start..end, None).await?;
788		let latest_ts_pair = ts_pairs.last();
789		if let Some((_, v)) = latest_ts_pair {
790			return Ok(Some(VersionStamp::from_slice(v)?));
791		}
792		Ok(None)
793	}
794
795	pub(crate) async fn new_save_point(&mut self) {
796		expand_inner!(&mut self.inner, v => { v.new_save_point() })
797	}
798
799	pub(crate) async fn rollback_to_save_point(&mut self) -> Result<(), Error> {
800		expand_inner!(&mut self.inner, v => { v.rollback_to_save_point().await })
801	}
802
803	pub(crate) async fn release_last_save_point(&mut self) -> Result<(), Error> {
804		expand_inner!(&mut self.inner, v => { v.release_last_save_point() })
805	}
806}