1use super::KeyDecode as _;
2use super::Transaction;
3use crate::cnf::EXPORT_BATCH_SIZE;
4use crate::err::Error;
5use crate::key::thing;
6use crate::sql::paths::EDGE;
7use crate::sql::paths::IN;
8use crate::sql::paths::OUT;
9use crate::sql::statements::DefineTableStatement;
10use crate::sql::Value;
11use async_channel::Sender;
12use chrono::prelude::Utc;
13use chrono::TimeZone;
14use std::fmt;
15
16#[derive(Clone, Debug)]
17pub struct Config {
18 pub users: bool,
19 pub accesses: bool,
20 pub params: bool,
21 pub functions: bool,
22 pub analyzers: bool,
23 pub tables: TableConfig,
24 pub versions: bool,
25 pub records: bool,
26}
27
28impl Default for Config {
29 fn default() -> Config {
30 Config {
31 users: true,
32 accesses: true,
33 params: true,
34 functions: true,
35 analyzers: true,
36 tables: TableConfig::default(),
37 versions: false,
38 records: true,
39 }
40 }
41}
42
43impl From<Config> for Value {
44 fn from(config: Config) -> Value {
45 let obj = map!(
46 "users" => config.users.into(),
47 "accesses" => config.accesses.into(),
48 "params" => config.params.into(),
49 "functions" => config.functions.into(),
50 "analyzers" => config.analyzers.into(),
51 "versions" => config.versions.into(),
52 "records" => config.records.into(),
53 "tables" => match config.tables {
54 TableConfig::All => true.into(),
55 TableConfig::None => false.into(),
56 TableConfig::Some(v) => v.into()
57 }
58 );
59
60 obj.into()
61 }
62}
63
64impl TryFrom<&Value> for Config {
65 type Error = Error;
66 fn try_from(value: &Value) -> Result<Self, Self::Error> {
67 match value {
68 Value::Object(obj) => {
69 let mut config = Config::default();
70
71 macro_rules! bool_prop {
72 ($prop:ident) => {{
73 match obj.get(stringify!($prop)) {
74 Some(Value::Bool(v)) => {
75 config.$prop = v.to_owned();
76 }
77 Some(v) => {
78 return Err(Error::InvalidExportConfig(
79 v.to_owned(),
80 "a bool".into(),
81 ))
82 }
83 _ => (),
84 }
85 }};
86 }
87
88 bool_prop!(users);
89 bool_prop!(accesses);
90 bool_prop!(params);
91 bool_prop!(functions);
92 bool_prop!(analyzers);
93 bool_prop!(versions);
94 bool_prop!(records);
95
96 if let Some(v) = obj.get("tables") {
97 config.tables = v.try_into()?;
98 }
99
100 Ok(config)
101 }
102 v => Err(Error::InvalidExportConfig(v.to_owned(), "an object".into())),
103 }
104 }
105}
106
107#[derive(Clone, Debug, Default)]
108pub enum TableConfig {
109 #[default]
110 All,
111 None,
112 Some(Vec<String>),
113}
114
115impl From<bool> for TableConfig {
116 fn from(value: bool) -> Self {
117 match value {
118 true => TableConfig::All,
119 false => TableConfig::None,
120 }
121 }
122}
123
124impl From<Vec<String>> for TableConfig {
125 fn from(value: Vec<String>) -> Self {
126 TableConfig::Some(value)
127 }
128}
129
130impl From<Vec<&str>> for TableConfig {
131 fn from(value: Vec<&str>) -> Self {
132 TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
133 }
134}
135
136impl TryFrom<&Value> for TableConfig {
137 type Error = Error;
138 fn try_from(value: &Value) -> Result<Self, Self::Error> {
139 match value {
140 Value::Bool(b) => match b {
141 true => Ok(TableConfig::All),
142 false => Ok(TableConfig::None),
143 },
144 Value::None | Value::Null => Ok(TableConfig::None),
145 Value::Array(v) => v
146 .iter()
147 .cloned()
148 .map(|v| match v {
149 Value::Strand(str) => Ok(str.0),
150 v => Err(Error::InvalidExportConfig(v.to_owned(), "a string".into())),
151 })
152 .collect::<Result<Vec<String>, Error>>()
153 .map(TableConfig::Some),
154 v => Err(Error::InvalidExportConfig(
155 v.to_owned(),
156 "a bool, none, null or array<string>".into(),
157 )),
158 }
159 }
160}
161
162impl TableConfig {
163 pub(crate) fn is_any(&self) -> bool {
165 matches!(self, Self::All | Self::Some(_))
166 }
167 pub(crate) fn includes(&self, table: &str) -> bool {
169 match self {
170 Self::All => true,
171 Self::None => false,
172 Self::Some(v) => v.iter().any(|v| v.eq(table)),
173 }
174 }
175}
176
177struct InlineCommentWriter<'a, F>(&'a mut F);
178impl<F: fmt::Write> fmt::Write for InlineCommentWriter<'_, F> {
179 fn write_str(&mut self, s: &str) -> fmt::Result {
180 for c in s.chars() {
181 self.write_char(c)?
182 }
183 Ok(())
184 }
185
186 fn write_char(&mut self, c: char) -> fmt::Result {
187 match c {
188 '\n' => self.0.write_str("\\n"),
189 '\r' => self.0.write_str("\\r"),
190 '\u{0085}' => self.0.write_str("\\u{0085}"),
192 '\u{2028}' => self.0.write_str("\\u{2028}"),
194 '\u{2029}' => self.0.write_str("\\u{2029}"),
196 _ => self.0.write_char(c),
197 }
198 }
199}
200
201struct InlineCommentDisplay<F>(F);
202impl<F: fmt::Display> fmt::Display for InlineCommentDisplay<F> {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 fmt::Write::write_fmt(&mut InlineCommentWriter(f), format_args!("{}", self.0))
205 }
206}
207
208impl Transaction {
209 pub async fn export(
211 &self,
212 ns: &str,
213 db: &str,
214 cfg: Config,
215 chn: Sender<Vec<u8>>,
216 ) -> Result<(), Error> {
217 self.export_metadata(&cfg, &chn, ns, db).await?;
219 self.export_tables(ns, db, &cfg, &chn).await?;
221 Ok(())
222 }
223
224 async fn export_metadata(
225 &self,
226 cfg: &Config,
227 chn: &Sender<Vec<u8>>,
228 ns: &str,
229 db: &str,
230 ) -> Result<(), Error> {
231 self.export_section("OPTION", vec!["OPTION IMPORT"], chn).await?;
233
234 if cfg.users {
236 let users = self.all_db_users(ns, db).await?;
237 self.export_section("USERS", users.to_vec(), chn).await?;
238 }
239
240 if cfg.accesses {
242 let accesses = self.all_db_accesses(ns, db).await?;
243 self.export_section("ACCESSES", accesses.to_vec(), chn).await?;
244 }
245
246 if cfg.params {
248 let params = self.all_db_params(ns, db).await?;
249 self.export_section("PARAMS", params.to_vec(), chn).await?;
250 }
251
252 if cfg.functions {
254 let functions = self.all_db_functions(ns, db).await?;
255 self.export_section("FUNCTIONS", functions.to_vec(), chn).await?;
256 }
257
258 if cfg.analyzers {
260 let analyzers = self.all_db_analyzers(ns, db).await?;
261 self.export_section("ANALYZERS", analyzers.to_vec(), chn).await?;
262 }
263
264 Ok(())
265 }
266
267 async fn export_section<T: ToString>(
268 &self,
269 title: &str,
270 items: Vec<T>,
271 chn: &Sender<Vec<u8>>,
272 ) -> Result<(), Error> {
273 if items.is_empty() {
274 return Ok(());
275 }
276
277 chn.send(bytes!("-- ------------------------------")).await?;
278 chn.send(bytes!(format!("-- {}", InlineCommentDisplay(title)))).await?;
279 chn.send(bytes!("-- ------------------------------")).await?;
280 chn.send(bytes!("")).await?;
281
282 for item in items {
283 chn.send(bytes!(format!("{};", item.to_string()))).await?;
284 }
285
286 chn.send(bytes!("")).await?;
287 Ok(())
288 }
289
290 async fn export_tables(
291 &self,
292 ns: &str,
293 db: &str,
294 cfg: &Config,
295 chn: &Sender<Vec<u8>>,
296 ) -> Result<(), Error> {
297 if !cfg.tables.is_any() {
299 return Ok(());
300 }
301 let tables = self.all_tb(ns, db, None).await?;
303 for table in tables.iter() {
305 if !cfg.tables.includes(&table.name) {
307 continue;
308 }
309 self.export_table_structure(ns, db, table, chn).await?;
311 if cfg.records {
313 self.export_table_data(ns, db, table, cfg, chn).await?;
314 }
315 }
316
317 Ok(())
318 }
319
320 async fn export_table_structure(
321 &self,
322 ns: &str,
323 db: &str,
324 table: &DefineTableStatement,
325 chn: &Sender<Vec<u8>>,
326 ) -> Result<(), Error> {
327 chn.send(bytes!("-- ------------------------------")).await?;
328 chn.send(bytes!(format!("-- TABLE: {}", InlineCommentDisplay(&table.name)))).await?;
329 chn.send(bytes!("-- ------------------------------")).await?;
330 chn.send(bytes!("")).await?;
331 chn.send(bytes!(format!("{};", table))).await?;
332 chn.send(bytes!("")).await?;
333 let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
335 for field in fields.iter() {
336 chn.send(bytes!(format!("{};", field))).await?;
337 }
338 chn.send(bytes!("")).await?;
339 let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
341 for index in indexes.iter() {
342 chn.send(bytes!(format!("{};", index))).await?;
343 }
344 chn.send(bytes!("")).await?;
345 let events = self.all_tb_events(ns, db, &table.name).await?;
347 for event in events.iter() {
348 chn.send(bytes!(format!("{};", event))).await?;
349 }
350 chn.send(bytes!("")).await?;
351 Ok(())
353 }
354
355 async fn export_table_data(
356 &self,
357 ns: &str,
358 db: &str,
359 table: &DefineTableStatement,
360 cfg: &Config,
361 chn: &Sender<Vec<u8>>,
362 ) -> Result<(), Error> {
363 chn.send(bytes!("-- ------------------------------")).await?;
364 chn.send(bytes!(format!("-- TABLE DATA: {}", InlineCommentDisplay(&table.name)))).await?;
365 chn.send(bytes!("-- ------------------------------")).await?;
366 chn.send(bytes!("")).await?;
367
368 let beg = crate::key::thing::prefix(ns, db, &table.name)?;
369 let end = crate::key::thing::suffix(ns, db, &table.name)?;
370 let mut next = Some(beg..end);
371
372 while let Some(rng) = next {
373 if cfg.versions {
374 let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?;
375 next = batch.next;
376 if batch.result.is_empty() {
378 break;
379 }
380 self.export_versioned_data(batch.result, chn).await?;
381 } else {
382 let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
383 next = batch.next;
384 if batch.result.is_empty() {
386 break;
387 }
388 self.export_regular_data(batch.result, chn).await?;
389 }
390 continue;
392 }
393
394 chn.send(bytes!("")).await?;
395 Ok(())
396 }
397
398 fn process_value(
415 k: thing::Thing,
416 v: Value,
417 records_relate: &mut Vec<String>,
418 records_normal: &mut Vec<String>,
419 is_tombstone: Option<bool>,
420 version: Option<u64>,
421 ) -> String {
422 match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
424 (Value::Bool(true), Value::Thing(_), Value::Thing(_)) => {
426 if let Some(version) = version {
427 let ts = Utc.timestamp_nanos(version as i64);
429 let sql = format!("INSERT RELATION {} VERSION d'{:?}';", v, ts);
430 records_relate.push(sql);
431 String::new()
432 } else {
433 records_relate.push(v.to_string());
435 String::new()
436 }
437 }
438 _ => {
440 if let Some(is_tombstone) = is_tombstone {
441 if is_tombstone {
442 format!("DELETE {}:{};", k.tb, k.id)
444 } else {
445 let ts = Utc.timestamp_nanos(version.unwrap() as i64);
447 format!("INSERT {} VERSION d'{:?}';", v, ts)
448 }
449 } else {
450 records_normal.push(v.to_string());
452 String::new()
453 }
454 }
455 }
456 }
457
458 async fn export_versioned_data(
474 &self,
475 versioned_values: Vec<(Vec<u8>, Vec<u8>, u64, bool)>,
476 chn: &Sender<Vec<u8>>,
477 ) -> Result<(), Error> {
478 let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
480
481 let mut counter = 0;
483
484 for (k, v, version, is_tombstone) in versioned_values {
486 if counter % *EXPORT_BATCH_SIZE == 0 {
488 chn.send(bytes!("BEGIN;")).await?;
489 }
490
491 let k = thing::Thing::decode(&k)?;
492 let v: Value = if v.is_empty() {
493 Value::None
494 } else {
495 revision::from_slice(&v)?
496 };
497 let sql = Self::process_value(
499 k,
500 v,
501 &mut records_relate,
502 &mut Vec::new(),
503 Some(is_tombstone),
504 Some(version),
505 );
506 if !sql.is_empty() {
508 chn.send(bytes!(sql)).await?;
509 }
510
511 counter += 1;
513
514 if counter % *EXPORT_BATCH_SIZE == 0 {
516 chn.send(bytes!("COMMIT;")).await?;
517 }
518 }
519
520 if counter % *EXPORT_BATCH_SIZE != 0 {
522 chn.send(bytes!("COMMIT;")).await?;
523 }
524
525 if records_relate.is_empty() {
527 return Ok(());
528 }
529
530 chn.send(bytes!("BEGIN;")).await?;
532
533 if !records_relate.is_empty() {
535 for record in records_relate.iter() {
536 chn.send(bytes!(record)).await?;
537 }
538 }
539
540 chn.send(bytes!("COMMIT;")).await?;
542
543 Ok(())
544 }
545
546 async fn export_regular_data(
562 &self,
563 regular_values: Vec<(Vec<u8>, Vec<u8>)>,
564 chn: &Sender<Vec<u8>>,
565 ) -> Result<(), Error> {
566 let mut records_normal = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
568 let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
569
570 for (k, v) in regular_values {
572 let k = thing::Thing::decode(&k)?;
573 let v: Value = revision::from_slice(&v)?;
574 Self::process_value(k, v, &mut records_relate, &mut records_normal, None, None);
576 }
577
578 if !records_normal.is_empty() {
580 let values = records_normal.join(", ");
581 let sql = format!("INSERT [ {} ];", values);
582 chn.send(bytes!(sql)).await?;
583 }
584
585 if !records_relate.is_empty() {
587 let values = records_relate.join(", ");
588 let sql = format!("INSERT RELATION [ {} ];", values);
589 chn.send(bytes!(sql)).await?;
590 }
591
592 Ok(())
593 }
594}