use super::KeyDecode as _;
use super::Transaction;
use crate::cnf::EXPORT_BATCH_SIZE;
use crate::err::Error;
use crate::key::thing;
use crate::sql::paths::EDGE;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::statements::DefineTableStatement;
use crate::sql::Value;
use async_channel::Sender;
use chrono::prelude::Utc;
use chrono::TimeZone;
#[derive(Clone, Debug)]
pub struct Config {
pub users: bool,
pub accesses: bool,
pub params: bool,
pub functions: bool,
pub analyzers: bool,
pub tables: TableConfig,
pub versions: bool,
pub records: bool,
}
impl Default for Config {
fn default() -> Config {
Config {
users: true,
accesses: true,
params: true,
functions: true,
analyzers: true,
tables: TableConfig::default(),
versions: false,
records: true,
}
}
}
impl From<Config> for Value {
fn from(config: Config) -> Value {
let obj = map!(
"users" => config.users.into(),
"accesses" => config.accesses.into(),
"params" => config.params.into(),
"functions" => config.functions.into(),
"analyzers" => config.analyzers.into(),
"versions" => config.versions.into(),
"tables" => match config.tables {
TableConfig::All => true.into(),
TableConfig::None => false.into(),
TableConfig::Some(v) => v.into()
}
);
obj.into()
}
}
impl TryFrom<&Value> for Config {
type Error = Error;
fn try_from(value: &Value) -> Result<Self, Self::Error> {
match value {
Value::Object(obj) => {
let mut config = Config::default();
macro_rules! bool_prop {
($prop:ident) => {{
match obj.get(stringify!($prop)) {
Some(Value::Bool(v)) => {
config.$prop = v.to_owned();
}
Some(v) => {
return Err(Error::InvalidExportConfig(
v.to_owned(),
"a bool".into(),
))
}
_ => (),
}
}};
}
bool_prop!(users);
bool_prop!(accesses);
bool_prop!(params);
bool_prop!(functions);
bool_prop!(analyzers);
bool_prop!(versions);
bool_prop!(records);
if let Some(v) = obj.get("tables") {
config.tables = v.try_into()?;
}
Ok(config)
}
v => Err(Error::InvalidExportConfig(v.to_owned(), "an object".into())),
}
}
}
#[derive(Clone, Debug, Default)]
pub enum TableConfig {
#[default]
All,
None,
Some(Vec<String>),
}
impl From<bool> for TableConfig {
fn from(value: bool) -> Self {
match value {
true => TableConfig::All,
false => TableConfig::None,
}
}
}
impl From<Vec<String>> for TableConfig {
fn from(value: Vec<String>) -> Self {
TableConfig::Some(value)
}
}
impl From<Vec<&str>> for TableConfig {
fn from(value: Vec<&str>) -> Self {
TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
}
}
impl TryFrom<&Value> for TableConfig {
type Error = Error;
fn try_from(value: &Value) -> Result<Self, Self::Error> {
match value {
Value::Bool(b) => match b {
true => Ok(TableConfig::All),
false => Ok(TableConfig::None),
},
Value::None | Value::Null => Ok(TableConfig::None),
Value::Array(v) => v
.iter()
.cloned()
.map(|v| match v {
Value::Strand(str) => Ok(str.0),
v => Err(Error::InvalidExportConfig(v.to_owned(), "a string".into())),
})
.collect::<Result<Vec<String>, Error>>()
.map(TableConfig::Some),
v => Err(Error::InvalidExportConfig(
v.to_owned(),
"a bool, none, null or array<string>".into(),
)),
}
}
}
impl TableConfig {
pub(crate) fn is_any(&self) -> bool {
matches!(self, Self::All | Self::Some(_))
}
pub(crate) fn includes(&self, table: &str) -> bool {
match self {
Self::All => true,
Self::None => false,
Self::Some(v) => v.iter().any(|v| v.eq(table)),
}
}
}
impl Transaction {
pub async fn export(
&self,
ns: &str,
db: &str,
cfg: Config,
chn: Sender<Vec<u8>>,
) -> Result<(), Error> {
self.export_metadata(&cfg, &chn, ns, db).await?;
self.export_tables(ns, db, &cfg, &chn).await?;
Ok(())
}
async fn export_metadata(
&self,
cfg: &Config,
chn: &Sender<Vec<u8>>,
ns: &str,
db: &str,
) -> Result<(), Error> {
self.export_section("OPTION", vec!["OPTION IMPORT"], chn).await?;
if cfg.users {
let users = self.all_db_users(ns, db).await?;
self.export_section("USERS", users.to_vec(), chn).await?;
}
if cfg.accesses {
let accesses = self.all_db_accesses(ns, db).await?;
self.export_section("ACCESSES", accesses.to_vec(), chn).await?;
}
if cfg.params {
let params = self.all_db_params(ns, db).await?;
self.export_section("PARAMS", params.to_vec(), chn).await?;
}
if cfg.functions {
let functions = self.all_db_functions(ns, db).await?;
self.export_section("FUNCTIONS", functions.to_vec(), chn).await?;
}
if cfg.analyzers {
let analyzers = self.all_db_analyzers(ns, db).await?;
self.export_section("ANALYZERS", analyzers.to_vec(), chn).await?;
}
Ok(())
}
async fn export_section<T: ToString>(
&self,
title: &str,
items: Vec<T>,
chn: &Sender<Vec<u8>>,
) -> Result<(), Error> {
if items.is_empty() {
return Ok(());
}
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- {}", title))).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for item in items {
chn.send(bytes!(format!("{};", item.to_string()))).await?;
}
chn.send(bytes!("")).await?;
Ok(())
}
async fn export_tables(
&self,
ns: &str,
db: &str,
cfg: &Config,
chn: &Sender<Vec<u8>>,
) -> Result<(), Error> {
if !cfg.tables.is_any() {
return Ok(());
}
let tables = self.all_tb(ns, db, None).await?;
for table in tables.iter() {
if !cfg.tables.includes(&table.name) {
continue;
}
self.export_table_structure(ns, db, table, chn).await?;
if cfg.records {
self.export_table_data(ns, db, table, cfg, chn).await?;
}
}
Ok(())
}
async fn export_table_structure(
&self,
ns: &str,
db: &str,
table: &DefineTableStatement,
chn: &Sender<Vec<u8>>,
) -> Result<(), Error> {
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- TABLE: {}", table.name))).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
chn.send(bytes!(format!("{};", table))).await?;
chn.send(bytes!("")).await?;
let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
for field in fields.iter() {
chn.send(bytes!(format!("{};", field))).await?;
}
chn.send(bytes!("")).await?;
let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
for index in indexes.iter() {
chn.send(bytes!(format!("{};", index))).await?;
}
chn.send(bytes!("")).await?;
let events = self.all_tb_events(ns, db, &table.name).await?;
for event in events.iter() {
chn.send(bytes!(format!("{};", event))).await?;
}
chn.send(bytes!("")).await?;
Ok(())
}
async fn export_table_data(
&self,
ns: &str,
db: &str,
table: &DefineTableStatement,
cfg: &Config,
chn: &Sender<Vec<u8>>,
) -> Result<(), Error> {
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- TABLE DATA: {}", table.name))).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
let beg = crate::key::thing::prefix(ns, db, &table.name)?;
let end = crate::key::thing::suffix(ns, db, &table.name)?;
let mut next = Some(beg..end);
while let Some(rng) = next {
if cfg.versions {
let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?;
next = batch.next;
if batch.result.is_empty() {
break;
}
self.export_versioned_data(batch.result, chn).await?;
} else {
let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
next = batch.next;
if batch.result.is_empty() {
break;
}
self.export_regular_data(batch.result, chn).await?;
}
continue;
}
chn.send(bytes!("")).await?;
Ok(())
}
fn process_value(
k: thing::Thing,
v: Value,
records_relate: &mut Vec<String>,
records_normal: &mut Vec<String>,
is_tombstone: Option<bool>,
version: Option<u64>,
) -> String {
match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
(Value::Bool(true), Value::Thing(_), Value::Thing(_)) => {
if let Some(version) = version {
let ts = Utc.timestamp_nanos(version as i64);
let sql = format!("INSERT RELATION {} VERSION d'{:?}';", v, ts);
records_relate.push(sql);
String::new()
} else {
records_relate.push(v.to_string());
String::new()
}
}
_ => {
if let Some(is_tombstone) = is_tombstone {
if is_tombstone {
format!("DELETE {}:{};", k.tb, k.id)
} else {
let ts = Utc.timestamp_nanos(version.unwrap() as i64);
format!("INSERT {} VERSION d'{:?}';", v, ts)
}
} else {
records_normal.push(v.to_string());
String::new()
}
}
}
}
async fn export_versioned_data(
&self,
versioned_values: Vec<(Vec<u8>, Vec<u8>, u64, bool)>,
chn: &Sender<Vec<u8>>,
) -> Result<(), Error> {
let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
let mut counter = 0;
for (k, v, version, is_tombstone) in versioned_values {
if counter % *EXPORT_BATCH_SIZE == 0 {
chn.send(bytes!("BEGIN;")).await?;
}
let k = thing::Thing::decode(&k)?;
let v: Value = if v.is_empty() {
Value::None
} else {
(&v).into()
};
let sql = Self::process_value(
k,
v,
&mut records_relate,
&mut Vec::new(),
Some(is_tombstone),
Some(version),
);
if !sql.is_empty() {
chn.send(bytes!(sql)).await?;
}
counter += 1;
if counter % *EXPORT_BATCH_SIZE == 0 {
chn.send(bytes!("COMMIT;")).await?;
}
}
if counter % *EXPORT_BATCH_SIZE != 0 {
chn.send(bytes!("COMMIT;")).await?;
}
if records_relate.is_empty() {
return Ok(());
}
chn.send(bytes!("BEGIN;")).await?;
if !records_relate.is_empty() {
for record in records_relate.iter() {
chn.send(bytes!(record)).await?;
}
}
chn.send(bytes!("COMMIT;")).await?;
Ok(())
}
async fn export_regular_data(
&self,
regular_values: Vec<(Vec<u8>, Vec<u8>)>,
chn: &Sender<Vec<u8>>,
) -> Result<(), Error> {
let mut records_normal = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
let mut records_relate = Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
for (k, v) in regular_values {
let k = thing::Thing::decode(&k)?;
let v: Value = (&v).into();
Self::process_value(k, v, &mut records_relate, &mut records_normal, None, None);
}
if !records_normal.is_empty() {
let values = records_normal.join(", ");
let sql = format!("INSERT [ {} ];", values);
chn.send(bytes!(sql)).await?;
}
if !records_relate.is_empty() {
let values = records_relate.join(", ");
let sql = format!("INSERT RELATION [ {} ];", values);
chn.send(bytes!(sql)).await?;
}
Ok(())
}
}