surrealdb_core/kvs/
export.rs

1use super::KeyDecode as _;
2use super::Transaction;
3use crate::cnf::EXPORT_BATCH_SIZE;
4use crate::err::Error;
5use crate::key::thing;
6use crate::sql::paths::EDGE;
7use crate::sql::paths::IN;
8use crate::sql::paths::OUT;
9use crate::sql::statements::DefineTableStatement;
10use crate::sql::Value;
11use async_channel::Sender;
12use chrono::prelude::Utc;
13use chrono::TimeZone;
14
15#[derive(Clone, Debug)]
16pub struct Config {
17	pub users: bool,
18	pub accesses: bool,
19	pub params: bool,
20	pub functions: bool,
21	pub analyzers: bool,
22	pub tables: TableConfig,
23	pub versions: bool,
24	pub records: bool,
25}
26
27impl Default for Config {
28	fn default() -> Config {
29		Config {
30			users: true,
31			accesses: true,
32			params: true,
33			functions: true,
34			analyzers: true,
35			tables: TableConfig::default(),
36			versions: false,
37			records: true,
38		}
39	}
40}
41
42impl From<Config> for Value {
43	fn from(config: Config) -> Value {
44		let obj = map!(
45			"users" => config.users.into(),
46			"accesses" => config.accesses.into(),
47			"params" => config.params.into(),
48			"functions" => config.functions.into(),
49			"analyzers" => config.analyzers.into(),
50			"versions" => config.versions.into(),
51			"records" => config.records.into(),
52			"tables" => match config.tables {
53				TableConfig::All => true.into(),
54				TableConfig::None => false.into(),
55				TableConfig::Some(v) => v.into()
56			}
57		);
58
59		obj.into()
60	}
61}
62
63impl TryFrom<&Value> for Config {
64	type Error = Error;
65	fn try_from(value: &Value) -> Result<Self, Self::Error> {
66		match value {
67			Value::Object(obj) => {
68				let mut config = Config::default();
69
70				macro_rules! bool_prop {
71					($prop:ident) => {{
72						match obj.get(stringify!($prop)) {
73							Some(Value::Bool(v)) => {
74								config.$prop = v.to_owned();
75							}
76							Some(v) => {
77								return Err(Error::InvalidExportConfig(
78									v.to_owned(),
79									"a bool".into(),
80								))
81							}
82							_ => (),
83						}
84					}};
85				}
86
87				bool_prop!(users);
88				bool_prop!(accesses);
89				bool_prop!(params);
90				bool_prop!(functions);
91				bool_prop!(analyzers);
92				bool_prop!(versions);
93				bool_prop!(records);
94
95				if let Some(v) = obj.get("tables") {
96					config.tables = v.try_into()?;
97				}
98
99				Ok(config)
100			}
101			v => Err(Error::InvalidExportConfig(v.to_owned(), "an object".into())),
102		}
103	}
104}
105
106#[derive(Clone, Debug, Default)]
107pub enum TableConfig {
108	#[default]
109	All,
110	None,
111	Some(Vec<String>),
112}
113
114impl From<bool> for TableConfig {
115	fn from(value: bool) -> Self {
116		match value {
117			true => TableConfig::All,
118			false => TableConfig::None,
119		}
120	}
121}
122
123impl From<Vec<String>> for TableConfig {
124	fn from(value: Vec<String>) -> Self {
125		TableConfig::Some(value)
126	}
127}
128
129impl From<Vec<&str>> for TableConfig {
130	fn from(value: Vec<&str>) -> Self {
131		TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
132	}
133}
134
135impl TryFrom<&Value> for TableConfig {
136	type Error = Error;
137	fn try_from(value: &Value) -> Result<Self, Self::Error> {
138		match value {
139			Value::Bool(b) => match b {
140				true => Ok(TableConfig::All),
141				false => Ok(TableConfig::None),
142			},
143			Value::None | Value::Null => Ok(TableConfig::None),
144			Value::Array(v) => v
145				.iter()
146				.cloned()
147				.map(|v| match v {
148					Value::Strand(str) => Ok(str.0),
149					v => Err(Error::InvalidExportConfig(v.to_owned(), "a string".into())),
150				})
151				.collect::<Result<Vec<String>, Error>>()
152				.map(TableConfig::Some),
153			v => Err(Error::InvalidExportConfig(
154				v.to_owned(),
155				"a bool, none, null or array<string>".into(),
156			)),
157		}
158	}
159}
160
161impl TableConfig {
162	/// Check if we should export tables
163	pub(crate) fn is_any(&self) -> bool {
164		matches!(self, Self::All | Self::Some(_))
165	}
166	// Check if we should export a specific table
167	pub(crate) fn includes(&self, table: &str) -> bool {
168		match self {
169			Self::All => true,
170			Self::None => false,
171			Self::Some(v) => v.iter().any(|v| v.eq(table)),
172		}
173	}
174}
175
176impl Transaction {
177	/// Writes the full database contents as binary SQL.
178	pub async fn export(
179		&self,
180		ns: &str,
181		db: &str,
182		cfg: Config,
183		chn: Sender<Vec<u8>>,
184	) -> Result<(), Error> {
185		// Output USERS, ACCESSES, PARAMS, FUNCTIONS, ANALYZERS
186		self.export_metadata(&cfg, &chn, ns, db).await?;
187		// Output TABLES
188		self.export_tables(ns, db, &cfg, &chn).await?;
189		Ok(())
190	}
191
192	async fn export_metadata(
193		&self,
194		cfg: &Config,
195		chn: &Sender<Vec<u8>>,
196		ns: &str,
197		db: &str,
198	) -> Result<(), Error> {
199		// Output OPTIONS
200		self.export_section("OPTION", vec!["OPTION IMPORT"], chn).await?;
201
202		// Output USERS
203		if cfg.users {
204			let users = self.all_db_users(ns, db).await?;
205			self.export_section("USERS", users.to_vec(), chn).await?;
206		}
207
208		// Output ACCESSES
209		if cfg.accesses {
210			let accesses = self.all_db_accesses(ns, db).await?;
211			self.export_section("ACCESSES", accesses.to_vec(), chn).await?;
212		}
213
214		// Output PARAMS
215		if cfg.params {
216			let params = self.all_db_params(ns, db).await?;
217			self.export_section("PARAMS", params.to_vec(), chn).await?;
218		}
219
220		// Output FUNCTIONS
221		if cfg.functions {
222			let functions = self.all_db_functions(ns, db).await?;
223			self.export_section("FUNCTIONS", functions.to_vec(), chn).await?;
224		}
225
226		// Output ANALYZERS
227		if cfg.analyzers {
228			let analyzers = self.all_db_analyzers(ns, db).await?;
229			self.export_section("ANALYZERS", analyzers.to_vec(), chn).await?;
230		}
231
232		Ok(())
233	}
234
235	async fn export_section<T: ToString>(
236		&self,
237		title: &str,
238		items: Vec<T>,
239		chn: &Sender<Vec<u8>>,
240	) -> Result<(), Error> {
241		if items.is_empty() {
242			return Ok(());
243		}
244
245		chn.send(bytes!("-- ------------------------------")).await?;
246		chn.send(bytes!(format!("-- {}", title))).await?;
247		chn.send(bytes!("-- ------------------------------")).await?;
248		chn.send(bytes!("")).await?;
249
250		for item in items {
251			chn.send(bytes!(format!("{};", item.to_string()))).await?;
252		}
253
254		chn.send(bytes!("")).await?;
255		Ok(())
256	}
257
258	async fn export_tables(
259		&self,
260		ns: &str,
261		db: &str,
262		cfg: &Config,
263		chn: &Sender<Vec<u8>>,
264	) -> Result<(), Error> {
265		// Check if tables are included in the export config
266		if !cfg.tables.is_any() {
267			return Ok(());
268		}
269		// Fetch all of the tables for this NS / DB
270		let tables = self.all_tb(ns, db, None).await?;
271		// Loop over all of the tables in order
272		for table in tables.iter() {
273			// Check if this table is included in the export config
274			if !cfg.tables.includes(&table.name) {
275				continue;
276			}
277			// Export the table definition structure first
278			self.export_table_structure(ns, db, table, chn).await?;
279			// Then export the table data if its desired
280			if cfg.records {
281				self.export_table_data(ns, db, table, cfg, chn).await?;
282			}
283		}
284
285		Ok(())
286	}
287
288	async fn export_table_structure(
289		&self,
290		ns: &str,
291		db: &str,
292		table: &DefineTableStatement,
293		chn: &Sender<Vec<u8>>,
294	) -> Result<(), Error> {
295		chn.send(bytes!("-- ------------------------------")).await?;
296		chn.send(bytes!(format!("-- TABLE: {}", table.name))).await?;
297		chn.send(bytes!("-- ------------------------------")).await?;
298		chn.send(bytes!("")).await?;
299		chn.send(bytes!(format!("{};", table))).await?;
300		chn.send(bytes!("")).await?;
301		// Export all table field definitions for this table
302		let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
303		for field in fields.iter() {
304			chn.send(bytes!(format!("{};", field))).await?;
305		}
306		chn.send(bytes!("")).await?;
307		// Export all table index definitions for this table
308		let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
309		for index in indexes.iter() {
310			chn.send(bytes!(format!("{};", index))).await?;
311		}
312		chn.send(bytes!("")).await?;
313		// Export all table event definitions for this table
314		let events = self.all_tb_events(ns, db, &table.name).await?;
315		for event in events.iter() {
316			chn.send(bytes!(format!("{};", event))).await?;
317		}
318		chn.send(bytes!("")).await?;
319		// Everything ok
320		Ok(())
321	}
322
323	async fn export_table_data(
324		&self,
325		ns: &str,
326		db: &str,
327		table: &DefineTableStatement,
328		cfg: &Config,
329		chn: &Sender<Vec<u8>>,
330	) -> Result<(), Error> {
331		chn.send(bytes!("-- ------------------------------")).await?;
332		chn.send(bytes!(format!("-- TABLE DATA: {}", table.name))).await?;
333		chn.send(bytes!("-- ------------------------------")).await?;
334		chn.send(bytes!("")).await?;
335
336		let beg = crate::key::thing::prefix(ns, db, &table.name)?;
337		let end = crate::key::thing::suffix(ns, db, &table.name)?;
338		let mut next = Some(beg..end);
339
340		while let Some(rng) = next {
341			if cfg.versions {
342				let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?;
343				next = batch.next;
344				// If there are no versioned values, return early.
345				if batch.result.is_empty() {
346					break;
347				}
348				self.export_versioned_data(batch.result, chn).await?;
349			} else {
350				let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
351				next = batch.next;
352				// If there are no values, return early.
353				if batch.result.is_empty() {
354					break;
355				}
356				self.export_regular_data(batch.result, chn).await?;
357			}
358			// Fetch more records
359			continue;
360		}
361
362		chn.send(bytes!("")).await?;
363		Ok(())
364	}
365
366	/// Processes a value and generates the appropriate SQL command.
367	///
368	/// This function processes a value, categorizing it into either normal records or graph edge records,
369	/// and generates the appropriate SQL command based on the type of record and the presence of a version.
370	///
371	/// # Arguments
372	///
373	/// * `v` - The value to be processed.
374	/// * `records_relate` - A mutable reference to a vector that holds graph edge records.
375	/// * `records_normal` - A mutable reference to a vector that holds normal records.
376	/// * `is_tombstone` - An optional boolean indicating if the record is a tombstone.
377	/// * `version` - An optional version number for the record.
378	///
379	/// # Returns
380	///
381	/// * `String` - Returns the generated SQL command as a string. If no command is generated, returns an empty string.
382	fn process_value(
383		k: thing::Thing,
384		v: Value,
385		records_relate: &mut Vec<String>,
386		records_normal: &mut Vec<String>,
387		is_tombstone: Option<bool>,
388		version: Option<u64>,
389	) -> String {
390		// Match on the value to determine if it is a graph edge record or a normal record.
391		match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
392			// If the value is a graph edge record (indicated by EDGE, IN, and OUT fields):
393			(Value::Bool(true), Value::Thing(_), Value::Thing(_)) => {
394				if let Some(version) = version {
395					// If a version exists, format the value as an INSERT RELATION VERSION command.
396					let ts = Utc.timestamp_nanos(version as i64);
397					let sql = format!("INSERT RELATION {} VERSION d'{:?}';", v, ts);
398					records_relate.push(sql);
399					String::new()
400				} else {
401					// If no version exists, push the value to the records_relate vector.
402					records_relate.push(v.to_string());
403					String::new()
404				}
405			}
406			// If the value is a normal record:
407			_ => {
408				if let Some(is_tombstone) = is_tombstone {
409					if is_tombstone {
410						// If the record is a tombstone, format it as a DELETE command.
411						format!("DELETE {}:{};", k.tb, k.id)
412					} else {
413						// If the record is not a tombstone and a version exists, format it as an INSERT VERSION command.
414						let ts = Utc.timestamp_nanos(version.unwrap() as i64);
415						format!("INSERT {} VERSION d'{:?}';", v, ts)
416					}
417				} else {
418					// If no tombstone or version information is provided, push the value to the records_normal vector.
419					records_normal.push(v.to_string());
420					String::new()
421				}
422			}
423		}
424	}
425
426	/// Exports versioned data to the provided channel.
427	///
428	/// This function processes a list of versioned values, converting them into SQL commands
429	/// and sending them to the provided channel. It handles both normal records and graph edge records,
430	/// and ensures that the appropriate SQL commands are generated for each type of record.
431	///
432	/// # Arguments
433	///
434	/// * `versioned_values` - A vector of tuples containing the versioned values to be exported.
435	///   Each tuple consists of a key, value, version, and a boolean indicating if the record is a tombstone.
436	/// * `chn` - A reference to the channel to which the SQL commands will be sent.
437	///
438	/// # Returns
439	///
440	/// * `Result<(), Error>` - Returns `Ok(())` if the operation is successful, or an `Error` if an error occurs.
441	async fn export_versioned_data(
442		&self,
443		versioned_values: Vec<(Vec<u8>, Vec<u8>, u64, bool)>,
444		chn: &Sender<Vec<u8>>,
445	) -> Result<(), Error> {
446		// Initialize a vector to hold graph edge records.
447		let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
448
449		// Initialize a counter for the number of processed records.
450		let mut counter = 0;
451
452		// Process each versioned value.
453		for (k, v, version, is_tombstone) in versioned_values {
454			// Begin a new transaction at the beginning of each batch.
455			if counter % *EXPORT_BATCH_SIZE == 0 {
456				chn.send(bytes!("BEGIN;")).await?;
457			}
458
459			let k = thing::Thing::decode(&k)?;
460			let v: Value = if v.is_empty() {
461				Value::None
462			} else {
463				revision::from_slice(&v)?
464			};
465			// Process the value and generate the appropriate SQL command.
466			let sql = Self::process_value(
467				k,
468				v,
469				&mut records_relate,
470				&mut Vec::new(),
471				Some(is_tombstone),
472				Some(version),
473			);
474			// If the SQL command is not empty, send it to the channel.
475			if !sql.is_empty() {
476				chn.send(bytes!(sql)).await?;
477			}
478
479			// Increment the counter.
480			counter += 1;
481
482			// Commit the transaction at the end of each batch.
483			if counter % *EXPORT_BATCH_SIZE == 0 {
484				chn.send(bytes!("COMMIT;")).await?;
485			}
486		}
487
488		// Commit any remaining records if the last batch was not full.
489		if counter % *EXPORT_BATCH_SIZE != 0 {
490			chn.send(bytes!("COMMIT;")).await?;
491		}
492
493		// If there are no graph edge records, return early.
494		if records_relate.is_empty() {
495			return Ok(());
496		}
497
498		// Begin a new transaction for graph edge records.
499		chn.send(bytes!("BEGIN;")).await?;
500
501		// If there are graph edge records, send them to the channel.
502		if !records_relate.is_empty() {
503			for record in records_relate.iter() {
504				chn.send(bytes!(record)).await?;
505			}
506		}
507
508		// Commit the transaction for graph edge records.
509		chn.send(bytes!("COMMIT;")).await?;
510
511		Ok(())
512	}
513
514	/// Exports regular data to the provided channel.
515	///
516	/// This function processes a list of regular values, converting them into SQL commands
517	/// and sending them to the provided channel. It handles both normal records and graph edge records,
518	/// and ensures that the appropriate SQL commands are generated for each type of record.
519	///
520	/// # Arguments
521	///
522	/// * `regular_values` - A vector of tuples containing the regular values to be exported.
523	///   Each tuple consists of a key and a value.
524	/// * `chn` - A reference to the channel to which the SQL commands will be sent.
525	///
526	/// # Returns
527	///
528	/// * `Result<(), Error>` - Returns `Ok(())` if the operation is successful, or an `Error` if an error occurs.
529	async fn export_regular_data(
530		&self,
531		regular_values: Vec<(Vec<u8>, Vec<u8>)>,
532		chn: &Sender<Vec<u8>>,
533	) -> Result<(), Error> {
534		// Initialize vectors to hold normal records and graph edge records.
535		let mut records_normal = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
536		let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
537
538		// Process each regular value.
539		for (k, v) in regular_values {
540			let k = thing::Thing::decode(&k)?;
541			let v: Value = revision::from_slice(&v)?;
542			// Process the value and categorize it into records_relate or records_normal.
543			Self::process_value(k, v, &mut records_relate, &mut records_normal, None, None);
544		}
545
546		// If there are normal records, generate and send the INSERT SQL command.
547		if !records_normal.is_empty() {
548			let values = records_normal.join(", ");
549			let sql = format!("INSERT [ {} ];", values);
550			chn.send(bytes!(sql)).await?;
551		}
552
553		// If there are graph edge records, generate and send the INSERT RELATION SQL command.
554		if !records_relate.is_empty() {
555			let values = records_relate.join(", ");
556			let sql = format!("INSERT RELATION [ {} ];", values);
557			chn.send(bytes!(sql)).await?;
558		}
559
560		Ok(())
561	}
562}