surrealdb_core/kvs/
export.rs

1use super::KeyDecode as _;
2use super::Transaction;
3use crate::cnf::EXPORT_BATCH_SIZE;
4use crate::err::Error;
5use crate::key::thing;
6use crate::sql::paths::EDGE;
7use crate::sql::paths::IN;
8use crate::sql::paths::OUT;
9use crate::sql::statements::DefineTableStatement;
10use crate::sql::Value;
11use async_channel::Sender;
12use chrono::prelude::Utc;
13use chrono::TimeZone;
14use std::fmt;
15
16#[derive(Clone, Debug)]
17pub struct Config {
18	pub users: bool,
19	pub accesses: bool,
20	pub params: bool,
21	pub functions: bool,
22	pub analyzers: bool,
23	pub tables: TableConfig,
24	pub versions: bool,
25	pub records: bool,
26}
27
28impl Default for Config {
29	fn default() -> Config {
30		Config {
31			users: true,
32			accesses: true,
33			params: true,
34			functions: true,
35			analyzers: true,
36			tables: TableConfig::default(),
37			versions: false,
38			records: true,
39		}
40	}
41}
42
43impl From<Config> for Value {
44	fn from(config: Config) -> Value {
45		let obj = map!(
46			"users" => config.users.into(),
47			"accesses" => config.accesses.into(),
48			"params" => config.params.into(),
49			"functions" => config.functions.into(),
50			"analyzers" => config.analyzers.into(),
51			"versions" => config.versions.into(),
52			"records" => config.records.into(),
53			"tables" => match config.tables {
54				TableConfig::All => true.into(),
55				TableConfig::None => false.into(),
56				TableConfig::Some(v) => v.into()
57			}
58		);
59
60		obj.into()
61	}
62}
63
64impl TryFrom<&Value> for Config {
65	type Error = Error;
66	fn try_from(value: &Value) -> Result<Self, Self::Error> {
67		match value {
68			Value::Object(obj) => {
69				let mut config = Config::default();
70
71				macro_rules! bool_prop {
72					($prop:ident) => {{
73						match obj.get(stringify!($prop)) {
74							Some(Value::Bool(v)) => {
75								config.$prop = v.to_owned();
76							}
77							Some(v) => {
78								return Err(Error::InvalidExportConfig(
79									v.to_owned(),
80									"a bool".into(),
81								))
82							}
83							_ => (),
84						}
85					}};
86				}
87
88				bool_prop!(users);
89				bool_prop!(accesses);
90				bool_prop!(params);
91				bool_prop!(functions);
92				bool_prop!(analyzers);
93				bool_prop!(versions);
94				bool_prop!(records);
95
96				if let Some(v) = obj.get("tables") {
97					config.tables = v.try_into()?;
98				}
99
100				Ok(config)
101			}
102			v => Err(Error::InvalidExportConfig(v.to_owned(), "an object".into())),
103		}
104	}
105}
106
107#[derive(Clone, Debug, Default)]
108pub enum TableConfig {
109	#[default]
110	All,
111	None,
112	Some(Vec<String>),
113}
114
115impl From<bool> for TableConfig {
116	fn from(value: bool) -> Self {
117		match value {
118			true => TableConfig::All,
119			false => TableConfig::None,
120		}
121	}
122}
123
124impl From<Vec<String>> for TableConfig {
125	fn from(value: Vec<String>) -> Self {
126		TableConfig::Some(value)
127	}
128}
129
130impl From<Vec<&str>> for TableConfig {
131	fn from(value: Vec<&str>) -> Self {
132		TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
133	}
134}
135
136impl TryFrom<&Value> for TableConfig {
137	type Error = Error;
138	fn try_from(value: &Value) -> Result<Self, Self::Error> {
139		match value {
140			Value::Bool(b) => match b {
141				true => Ok(TableConfig::All),
142				false => Ok(TableConfig::None),
143			},
144			Value::None | Value::Null => Ok(TableConfig::None),
145			Value::Array(v) => v
146				.iter()
147				.cloned()
148				.map(|v| match v {
149					Value::Strand(str) => Ok(str.0),
150					v => Err(Error::InvalidExportConfig(v.to_owned(), "a string".into())),
151				})
152				.collect::<Result<Vec<String>, Error>>()
153				.map(TableConfig::Some),
154			v => Err(Error::InvalidExportConfig(
155				v.to_owned(),
156				"a bool, none, null or array<string>".into(),
157			)),
158		}
159	}
160}
161
162impl TableConfig {
163	/// Check if we should export tables
164	pub(crate) fn is_any(&self) -> bool {
165		matches!(self, Self::All | Self::Some(_))
166	}
167	// Check if we should export a specific table
168	pub(crate) fn includes(&self, table: &str) -> bool {
169		match self {
170			Self::All => true,
171			Self::None => false,
172			Self::Some(v) => v.iter().any(|v| v.eq(table)),
173		}
174	}
175}
176
177struct InlineCommentWriter<'a, F>(&'a mut F);
178impl<F: fmt::Write> fmt::Write for InlineCommentWriter<'_, F> {
179	fn write_str(&mut self, s: &str) -> fmt::Result {
180		for c in s.chars() {
181			self.write_char(c)?
182		}
183		Ok(())
184	}
185
186	fn write_char(&mut self, c: char) -> fmt::Result {
187		match c {
188			'\n' => self.0.write_str("\\n"),
189			'\r' => self.0.write_str("\\r"),
190			// NEL/Next Line
191			'\u{0085}' => self.0.write_str("\\u{0085}"),
192			// line seperator
193			'\u{2028}' => self.0.write_str("\\u{2028}"),
194			// Paragraph seperator
195			'\u{2029}' => self.0.write_str("\\u{2029}"),
196			_ => self.0.write_char(c),
197		}
198	}
199}
200
201struct InlineCommentDisplay<F>(F);
202impl<F: fmt::Display> fmt::Display for InlineCommentDisplay<F> {
203	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204		fmt::Write::write_fmt(&mut InlineCommentWriter(f), format_args!("{}", self.0))
205	}
206}
207
208impl Transaction {
209	/// Writes the full database contents as binary SQL.
210	pub async fn export(
211		&self,
212		ns: &str,
213		db: &str,
214		cfg: Config,
215		chn: Sender<Vec<u8>>,
216	) -> Result<(), Error> {
217		// Output USERS, ACCESSES, PARAMS, FUNCTIONS, ANALYZERS
218		self.export_metadata(&cfg, &chn, ns, db).await?;
219		// Output TABLES
220		self.export_tables(ns, db, &cfg, &chn).await?;
221		Ok(())
222	}
223
224	async fn export_metadata(
225		&self,
226		cfg: &Config,
227		chn: &Sender<Vec<u8>>,
228		ns: &str,
229		db: &str,
230	) -> Result<(), Error> {
231		// Output OPTIONS
232		self.export_section("OPTION", vec!["OPTION IMPORT"], chn).await?;
233
234		// Output USERS
235		if cfg.users {
236			let users = self.all_db_users(ns, db).await?;
237			self.export_section("USERS", users.to_vec(), chn).await?;
238		}
239
240		// Output ACCESSES
241		if cfg.accesses {
242			let accesses = self.all_db_accesses(ns, db).await?;
243			self.export_section("ACCESSES", accesses.to_vec(), chn).await?;
244		}
245
246		// Output PARAMS
247		if cfg.params {
248			let params = self.all_db_params(ns, db).await?;
249			self.export_section("PARAMS", params.to_vec(), chn).await?;
250		}
251
252		// Output FUNCTIONS
253		if cfg.functions {
254			let functions = self.all_db_functions(ns, db).await?;
255			self.export_section("FUNCTIONS", functions.to_vec(), chn).await?;
256		}
257
258		// Output ANALYZERS
259		if cfg.analyzers {
260			let analyzers = self.all_db_analyzers(ns, db).await?;
261			self.export_section("ANALYZERS", analyzers.to_vec(), chn).await?;
262		}
263
264		Ok(())
265	}
266
267	async fn export_section<T: ToString>(
268		&self,
269		title: &str,
270		items: Vec<T>,
271		chn: &Sender<Vec<u8>>,
272	) -> Result<(), Error> {
273		if items.is_empty() {
274			return Ok(());
275		}
276
277		chn.send(bytes!("-- ------------------------------")).await?;
278		chn.send(bytes!(format!("-- {}", InlineCommentDisplay(title)))).await?;
279		chn.send(bytes!("-- ------------------------------")).await?;
280		chn.send(bytes!("")).await?;
281
282		for item in items {
283			chn.send(bytes!(format!("{};", item.to_string()))).await?;
284		}
285
286		chn.send(bytes!("")).await?;
287		Ok(())
288	}
289
290	async fn export_tables(
291		&self,
292		ns: &str,
293		db: &str,
294		cfg: &Config,
295		chn: &Sender<Vec<u8>>,
296	) -> Result<(), Error> {
297		// Check if tables are included in the export config
298		if !cfg.tables.is_any() {
299			return Ok(());
300		}
301		// Fetch all of the tables for this NS / DB
302		let tables = self.all_tb(ns, db, None).await?;
303		// Loop over all of the tables in order
304		for table in tables.iter() {
305			// Check if this table is included in the export config
306			if !cfg.tables.includes(&table.name) {
307				continue;
308			}
309			// Export the table definition structure first
310			self.export_table_structure(ns, db, table, chn).await?;
311			// Then export the table data if its desired
312			if cfg.records {
313				self.export_table_data(ns, db, table, cfg, chn).await?;
314			}
315		}
316
317		Ok(())
318	}
319
320	async fn export_table_structure(
321		&self,
322		ns: &str,
323		db: &str,
324		table: &DefineTableStatement,
325		chn: &Sender<Vec<u8>>,
326	) -> Result<(), Error> {
327		chn.send(bytes!("-- ------------------------------")).await?;
328		chn.send(bytes!(format!("-- TABLE: {}", InlineCommentDisplay(&table.name)))).await?;
329		chn.send(bytes!("-- ------------------------------")).await?;
330		chn.send(bytes!("")).await?;
331		chn.send(bytes!(format!("{};", table))).await?;
332		chn.send(bytes!("")).await?;
333		// Export all table field definitions for this table
334		let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
335		for field in fields.iter() {
336			chn.send(bytes!(format!("{};", field))).await?;
337		}
338		chn.send(bytes!("")).await?;
339		// Export all table index definitions for this table
340		let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
341		for index in indexes.iter() {
342			chn.send(bytes!(format!("{};", index))).await?;
343		}
344		chn.send(bytes!("")).await?;
345		// Export all table event definitions for this table
346		let events = self.all_tb_events(ns, db, &table.name).await?;
347		for event in events.iter() {
348			chn.send(bytes!(format!("{};", event))).await?;
349		}
350		chn.send(bytes!("")).await?;
351		// Everything ok
352		Ok(())
353	}
354
355	async fn export_table_data(
356		&self,
357		ns: &str,
358		db: &str,
359		table: &DefineTableStatement,
360		cfg: &Config,
361		chn: &Sender<Vec<u8>>,
362	) -> Result<(), Error> {
363		chn.send(bytes!("-- ------------------------------")).await?;
364		chn.send(bytes!(format!("-- TABLE DATA: {}", InlineCommentDisplay(&table.name)))).await?;
365		chn.send(bytes!("-- ------------------------------")).await?;
366		chn.send(bytes!("")).await?;
367
368		let beg = crate::key::thing::prefix(ns, db, &table.name)?;
369		let end = crate::key::thing::suffix(ns, db, &table.name)?;
370		let mut next = Some(beg..end);
371
372		while let Some(rng) = next {
373			if cfg.versions {
374				let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?;
375				next = batch.next;
376				// If there are no versioned values, return early.
377				if batch.result.is_empty() {
378					break;
379				}
380				self.export_versioned_data(batch.result, chn).await?;
381			} else {
382				let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
383				next = batch.next;
384				// If there are no values, return early.
385				if batch.result.is_empty() {
386					break;
387				}
388				self.export_regular_data(batch.result, chn).await?;
389			}
390			// Fetch more records
391			continue;
392		}
393
394		chn.send(bytes!("")).await?;
395		Ok(())
396	}
397
398	/// Processes a value and generates the appropriate SQL command.
399	///
400	/// This function processes a value, categorizing it into either normal records or graph edge records,
401	/// and generates the appropriate SQL command based on the type of record and the presence of a version.
402	///
403	/// # Arguments
404	///
405	/// * `v` - The value to be processed.
406	/// * `records_relate` - A mutable reference to a vector that holds graph edge records.
407	/// * `records_normal` - A mutable reference to a vector that holds normal records.
408	/// * `is_tombstone` - An optional boolean indicating if the record is a tombstone.
409	/// * `version` - An optional version number for the record.
410	///
411	/// # Returns
412	///
413	/// * `String` - Returns the generated SQL command as a string. If no command is generated, returns an empty string.
414	fn process_value(
415		k: thing::Thing,
416		v: Value,
417		records_relate: &mut Vec<String>,
418		records_normal: &mut Vec<String>,
419		is_tombstone: Option<bool>,
420		version: Option<u64>,
421	) -> String {
422		// Match on the value to determine if it is a graph edge record or a normal record.
423		match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
424			// If the value is a graph edge record (indicated by EDGE, IN, and OUT fields):
425			(Value::Bool(true), Value::Thing(_), Value::Thing(_)) => {
426				if let Some(version) = version {
427					// If a version exists, format the value as an INSERT RELATION VERSION command.
428					let ts = Utc.timestamp_nanos(version as i64);
429					let sql = format!("INSERT RELATION {} VERSION d'{:?}';", v, ts);
430					records_relate.push(sql);
431					String::new()
432				} else {
433					// If no version exists, push the value to the records_relate vector.
434					records_relate.push(v.to_string());
435					String::new()
436				}
437			}
438			// If the value is a normal record:
439			_ => {
440				if let Some(is_tombstone) = is_tombstone {
441					if is_tombstone {
442						// If the record is a tombstone, format it as a DELETE command.
443						format!("DELETE {}:{};", k.tb, k.id)
444					} else {
445						// If the record is not a tombstone and a version exists, format it as an INSERT VERSION command.
446						let ts = Utc.timestamp_nanos(version.unwrap() as i64);
447						format!("INSERT {} VERSION d'{:?}';", v, ts)
448					}
449				} else {
450					// If no tombstone or version information is provided, push the value to the records_normal vector.
451					records_normal.push(v.to_string());
452					String::new()
453				}
454			}
455		}
456	}
457
458	/// Exports versioned data to the provided channel.
459	///
460	/// This function processes a list of versioned values, converting them into SQL commands
461	/// and sending them to the provided channel. It handles both normal records and graph edge records,
462	/// and ensures that the appropriate SQL commands are generated for each type of record.
463	///
464	/// # Arguments
465	///
466	/// * `versioned_values` - A vector of tuples containing the versioned values to be exported.
467	///   Each tuple consists of a key, value, version, and a boolean indicating if the record is a tombstone.
468	/// * `chn` - A reference to the channel to which the SQL commands will be sent.
469	///
470	/// # Returns
471	///
472	/// * `Result<(), Error>` - Returns `Ok(())` if the operation is successful, or an `Error` if an error occurs.
473	async fn export_versioned_data(
474		&self,
475		versioned_values: Vec<(Vec<u8>, Vec<u8>, u64, bool)>,
476		chn: &Sender<Vec<u8>>,
477	) -> Result<(), Error> {
478		// Initialize a vector to hold graph edge records.
479		let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
480
481		// Initialize a counter for the number of processed records.
482		let mut counter = 0;
483
484		// Process each versioned value.
485		for (k, v, version, is_tombstone) in versioned_values {
486			// Begin a new transaction at the beginning of each batch.
487			if counter % *EXPORT_BATCH_SIZE == 0 {
488				chn.send(bytes!("BEGIN;")).await?;
489			}
490
491			let k = thing::Thing::decode(&k)?;
492			let v: Value = if v.is_empty() {
493				Value::None
494			} else {
495				revision::from_slice(&v)?
496			};
497			// Process the value and generate the appropriate SQL command.
498			let sql = Self::process_value(
499				k,
500				v,
501				&mut records_relate,
502				&mut Vec::new(),
503				Some(is_tombstone),
504				Some(version),
505			);
506			// If the SQL command is not empty, send it to the channel.
507			if !sql.is_empty() {
508				chn.send(bytes!(sql)).await?;
509			}
510
511			// Increment the counter.
512			counter += 1;
513
514			// Commit the transaction at the end of each batch.
515			if counter % *EXPORT_BATCH_SIZE == 0 {
516				chn.send(bytes!("COMMIT;")).await?;
517			}
518		}
519
520		// Commit any remaining records if the last batch was not full.
521		if counter % *EXPORT_BATCH_SIZE != 0 {
522			chn.send(bytes!("COMMIT;")).await?;
523		}
524
525		// If there are no graph edge records, return early.
526		if records_relate.is_empty() {
527			return Ok(());
528		}
529
530		// Begin a new transaction for graph edge records.
531		chn.send(bytes!("BEGIN;")).await?;
532
533		// If there are graph edge records, send them to the channel.
534		if !records_relate.is_empty() {
535			for record in records_relate.iter() {
536				chn.send(bytes!(record)).await?;
537			}
538		}
539
540		// Commit the transaction for graph edge records.
541		chn.send(bytes!("COMMIT;")).await?;
542
543		Ok(())
544	}
545
546	/// Exports regular data to the provided channel.
547	///
548	/// This function processes a list of regular values, converting them into SQL commands
549	/// and sending them to the provided channel. It handles both normal records and graph edge records,
550	/// and ensures that the appropriate SQL commands are generated for each type of record.
551	///
552	/// # Arguments
553	///
554	/// * `regular_values` - A vector of tuples containing the regular values to be exported.
555	///   Each tuple consists of a key and a value.
556	/// * `chn` - A reference to the channel to which the SQL commands will be sent.
557	///
558	/// # Returns
559	///
560	/// * `Result<(), Error>` - Returns `Ok(())` if the operation is successful, or an `Error` if an error occurs.
561	async fn export_regular_data(
562		&self,
563		regular_values: Vec<(Vec<u8>, Vec<u8>)>,
564		chn: &Sender<Vec<u8>>,
565	) -> Result<(), Error> {
566		// Initialize vectors to hold normal records and graph edge records.
567		let mut records_normal = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
568		let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
569
570		// Process each regular value.
571		for (k, v) in regular_values {
572			let k = thing::Thing::decode(&k)?;
573			let v: Value = revision::from_slice(&v)?;
574			// Process the value and categorize it into records_relate or records_normal.
575			Self::process_value(k, v, &mut records_relate, &mut records_normal, None, None);
576		}
577
578		// If there are normal records, generate and send the INSERT SQL command.
579		if !records_normal.is_empty() {
580			let values = records_normal.join(", ");
581			let sql = format!("INSERT [ {} ];", values);
582			chn.send(bytes!(sql)).await?;
583		}
584
585		// If there are graph edge records, generate and send the INSERT RELATION SQL command.
586		if !records_relate.is_empty() {
587			let values = records_relate.join(", ");
588			let sql = format!("INSERT RELATION [ {} ];", values);
589			chn.send(bytes!(sql)).await?;
590		}
591
592		Ok(())
593	}
594}