use crate::{IndexerConfig, IndexerError, IndexerResult, Manifest};
use fuel_indexer_database::{
queries, IndexerConnection, IndexerConnectionPool, IndexerDatabaseError,
};
use fuel_indexer_lib::{
fully_qualified_namespace, graphql::types::IdCol, utils::format_sql_query,
};
use fuel_indexer_schema::FtColumn;
use std::collections::HashMap;
use tracing::{debug, error, info};
#[derive(Debug)]
pub struct Database {
pool: IndexerConnectionPool,
stashed: Option<IndexerConnection>,
namespace: String,
identifier: String,
version: String,
schema: HashMap<String, Vec<String>>,
tables: HashMap<i64, String>,
config: IndexerConfig,
}
fn is_id_only_upsert(columns: &[String]) -> bool {
columns.len() == 2 && columns[0] == IdCol::to_lowercase_string()
}
impl Database {
pub async fn new(
pool: IndexerConnectionPool,
manifest: &Manifest,
config: &IndexerConfig,
) -> Database {
Database {
pool,
stashed: None,
namespace: manifest.namespace().to_string(),
identifier: manifest.identifier().to_string(),
version: Default::default(),
schema: Default::default(),
tables: Default::default(),
config: config.clone(),
}
}
pub async fn start_transaction(&mut self) -> IndexerResult<usize> {
let conn = self.pool.acquire().await?;
self.stashed = Some(conn);
debug!("Connection stashed as: {:?}", self.stashed);
let conn =
self.stashed
.as_mut()
.ok_or(crate::IndexerError::NoTransactionError(
"start_transaction".to_string(),
))?;
let result = queries::start_transaction(conn).await?;
Ok(result)
}
pub async fn commit_transaction(&mut self) -> IndexerResult<usize> {
let conn =
self.stashed
.as_mut()
.ok_or(crate::IndexerError::NoTransactionError(
"commit_transaction".to_string(),
))?;
let res = queries::commit_transaction(conn).await?;
Ok(res)
}
pub async fn revert_transaction(&mut self) -> IndexerResult<usize> {
let conn =
self.stashed
.as_mut()
.ok_or(crate::IndexerError::NoTransactionError(
"revert_transaction".to_string(),
))?;
let res = queries::revert_transaction(conn).await?;
Ok(res)
}
fn upsert_query(
&self,
table: &str,
columns: &[String],
inserts: Vec<String>,
updates: Vec<String>,
) -> String {
if is_id_only_upsert(columns) {
format!(
"INSERT INTO {} ({}) VALUES ({}, $1::bytea) ON CONFLICT(id) DO NOTHING",
table,
columns.join(", "),
inserts.join(", "),
)
} else {
format!(
"INSERT INTO {} ({}) VALUES ({}, $1::bytea) ON CONFLICT(id) DO UPDATE SET {}",
table,
columns.join(", "),
inserts.join(", "),
updates.join(", "),
)
}
}
fn get_query(&self, table: &str, object_id: &str) -> String {
let q = format!("SELECT object from {table} where id = '{object_id}'");
if self.config.verbose {
info!("{q}");
}
q
}
pub async fn put_object(
&mut self,
type_id: i64,
columns: Vec<FtColumn>,
bytes: Vec<u8>,
) -> IndexerResult<()> {
let table = match self.tables.get(&type_id) {
Some(t) => t,
None => {
return Err(anyhow::anyhow!(
r#"TypeId({type_id}) not found in tables: {:?}.
Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema?
Do your WASM modules need to be rebuilt?
"#,
self.tables,
).into());
}
};
let inserts: Vec<_> = columns.iter().map(|col| col.query_fragment()).collect();
let updates: Vec<_> = self.schema[table]
.iter()
.zip(columns.iter())
.map(|(colname, value)| format!("{colname} = {}", value.query_fragment()))
.collect();
let columns = self.schema[table].clone();
let query_text =
format_sql_query(self.upsert_query(table, &columns, inserts, updates));
let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError("put_object".to_string()))?;
if self.config.verbose {
info!("{query_text}");
}
queries::put_object(conn, query_text, bytes).await?;
Ok(())
}
pub async fn get_object(
&mut self,
type_id: i64,
object_id: String,
) -> IndexerResult<Option<Vec<u8>>> {
let table = &self
.tables
.get(&type_id)
.ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?;
let query = self.get_query(table, &object_id);
let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError("get_object".to_string()))?;
match queries::get_object(conn, query).await {
Ok(v) => Ok(Some(v)),
Err(e) => {
if let sqlx::Error::RowNotFound = e {
debug!("Row not found for object ID: {object_id}");
} else {
error!("Failed to get_object: {e:?}");
}
Ok(None)
}
}
}
pub async fn find_many(
&mut self,
type_id: i64,
constraints: String,
) -> IndexerResult<Vec<Vec<u8>>> {
let table = &self
.tables
.get(&type_id)
.ok_or(IndexerDatabaseError::TableMappingDoesNotExist(type_id))?;
let query = format!("SELECT object from {table} WHERE {constraints}");
let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError("find_many".to_string()))?;
match queries::get_objects(conn, query).await {
Ok(v) => Ok(v),
Err(e) => {
if let sqlx::Error::RowNotFound = e {
debug!("Row not found");
} else {
error!("Failed to get_objects: {e:?}");
}
Ok(vec![])
}
}
}
pub async fn load_schema(&mut self, version: String) -> IndexerResult<()> {
self.version = version;
info!(
"Database loading schema for Indexer({}.{}) with Version({}).",
self.namespace, self.identifier, self.version
);
let mut conn = self.pool.acquire().await?;
let columns = queries::columns_get_schema(
&mut conn,
&self.namespace,
&self.identifier,
&self.version,
)
.await?;
for column in columns {
let table = &format!(
"{}.{}",
fully_qualified_namespace(&self.namespace, &self.identifier),
&column.table_name
);
self.tables
.entry(column.type_id)
.or_insert_with(|| table.to_string());
let columns = self.schema.entry(table.to_string()).or_default();
columns.push(column.column_name);
}
Ok(())
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn version(&self) -> &str {
&self.version
}
pub fn schema(&self) -> &HashMap<String, Vec<String>> {
&self.schema
}
pub async fn put_many_to_many_record(
&mut self,
queries: Vec<String>,
) -> IndexerResult<()> {
let conn = self
.stashed
.as_mut()
.ok_or(IndexerError::NoTransactionError(
"put_many_to_many_record".to_string(),
))?;
for query in queries {
if self.config.verbose {
info!("{query}");
}
queries::put_many_to_many_record(conn, query).await?;
}
Ok(())
}
}