#![deny(unused_crate_dependencies)]
use bigdecimal::ToPrimitive;
use fuel_indexer_database_types::*;
use fuel_indexer_lib::utils::sha256_digest;
use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row};
use std::collections::HashMap;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::info;
#[cfg(feature = "metrics")]
use std::time::Instant;
#[cfg(feature = "metrics")]
use fuel_indexer_metrics::METRICS;
#[cfg(feature = "metrics")]
use fuel_indexer_macro_utils::metrics;
use chrono::{DateTime, NaiveDateTime, Utc};
const NONCE_EXPIRY_SECS: u64 = 3600;
#[cfg_attr(feature = "metrics", metrics)]
pub async fn put_object(
conn: &mut PoolConnection<Postgres>,
query: String,
bytes: Vec<u8>,
) -> sqlx::Result<usize> {
let mut builder = sqlx::QueryBuilder::new(query);
let query = builder.build();
let query = query.bind(bytes);
let result = query.execute(conn).await?;
Ok(result.rows_affected() as usize)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn get_object(
conn: &mut PoolConnection<Postgres>,
query: String,
) -> sqlx::Result<Vec<u8>> {
let mut builder = sqlx::QueryBuilder::new(query);
let query = builder.build();
let row = query.fetch_one(conn).await?;
Ok(row.get(0))
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn get_objects(
conn: &mut PoolConnection<Postgres>,
query: String,
) -> sqlx::Result<Vec<Vec<u8>>> {
let mut builder = sqlx::QueryBuilder::new(query);
let query = builder.build();
let rows = query.fetch_all(conn).await?;
let objects = rows.iter().map(|r| r.get(0)).collect::<Vec<Vec<u8>>>();
Ok(objects)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn run_migration(conn: &mut PoolConnection<Postgres>) -> sqlx::Result<()> {
sqlx::migrate!().run(conn).await?;
Ok(())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn run_query(
conn: &mut PoolConnection<Postgres>,
query: String,
) -> sqlx::Result<JsonValue> {
let mut builder = sqlx::QueryBuilder::new(query);
let query = builder.build();
Ok(query
.fetch_all(conn)
.await?
.iter()
.filter_map(|r| match r.try_get::<JsonValue, usize>(0) {
Ok(v) => Some(v),
Err(_e) => None,
})
.collect())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn execute_query(
conn: &mut PoolConnection<Postgres>,
query: String,
) -> sqlx::Result<usize> {
let mut builder = sqlx::QueryBuilder::new(query);
let query = builder.build();
let result = query.execute(conn).await?;
Ok(result.rows_affected() as usize)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn root_columns_list_by_id(
conn: &mut PoolConnection<Postgres>,
root_id: i64,
) -> sqlx::Result<Vec<RootColumn>> {
Ok(
sqlx::query("SELECT * FROM graph_registry_root_columns WHERE root_id = $1")
.bind(root_id)
.fetch_all(conn)
.await?
.into_iter()
.map(|row| {
let id: i64 = row.get(0);
let root_id: i64 = row.get(1);
let column_name: String = row.get(2);
let graphql_type: String = row.get(3);
RootColumn {
id,
root_id,
column_name,
graphql_type,
}
})
.collect::<Vec<RootColumn>>(),
)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn new_root_columns(
conn: &mut PoolConnection<Postgres>,
cols: Vec<RootColumn>,
) -> sqlx::Result<usize> {
let mut builder = sqlx::QueryBuilder::new(
"INSERT INTO graph_registry_root_columns (root_id, column_name, graphql_type)",
);
builder.push_values(cols, |mut b, new_col| {
b.push_bind(new_col.root_id)
.push_bind(new_col.column_name)
.push_bind(new_col.graphql_type);
});
let query = builder.build();
let result = query.execute(conn).await?;
Ok(result.rows_affected() as usize)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn new_graph_root(
conn: &mut PoolConnection<Postgres>,
root: GraphRoot,
) -> sqlx::Result<usize> {
let mut builder = sqlx::QueryBuilder::new(
"INSERT INTO graph_registry_graph_root (version, schema_name, schema_identifier, schema)",
);
builder.push_values(std::iter::once(root), |mut b, root| {
b.push_bind(root.version)
.push_bind(root.schema_name)
.push_bind(root.schema_identifier)
.push_bind(root.schema);
});
let query = builder.build();
let result = query.execute(conn).await?;
Ok(result.rows_affected() as usize)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn graph_root_latest(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<GraphRoot> {
let row = sqlx::query(
"SELECT * FROM graph_registry_graph_root
WHERE schema_name = $1 AND schema_identifier = $2
ORDER BY id DESC LIMIT 1",
)
.bind(namespace)
.bind(identifier)
.fetch_one(conn)
.await?;
let id: i64 = row.get(0);
let version: String = row.get(1);
let schema_name: String = row.get(2);
let schema: String = row.get(3);
Ok(GraphRoot {
id,
version,
schema_name,
schema,
schema_identifier: identifier.to_string(),
})
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn type_id_list_by_name(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
version: &str,
identifier: &str,
) -> sqlx::Result<Vec<TypeId>> {
Ok(sqlx::query(
"SELECT * FROM graph_registry_type_ids
WHERE schema_name = $1
AND schema_version = $2
AND schema_identifier = $3",
)
.bind(namespace)
.bind(version)
.bind(identifier)
.fetch_all(conn)
.await?
.into_iter()
.map(|row| {
let id: i64 = row.get(0);
let version: String = row.get(1);
let namespace: String = row.get(2);
let graphql_name: String = row.get(3);
let table_name: String = row.get(4);
let identifier: String = row.get(5);
TypeId {
id,
version,
namespace,
table_name,
graphql_name,
identifier,
}
})
.collect::<Vec<TypeId>>())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn type_id_latest(
conn: &mut PoolConnection<Postgres>,
schema_name: &str,
identifier: &str,
) -> sqlx::Result<String> {
let latest = sqlx::query(
"SELECT schema_version FROM graph_registry_type_ids
WHERE schema_name = $1
AND schema_identifier = $2
ORDER BY id",
)
.bind(schema_name)
.bind(identifier)
.fetch_one(conn)
.await?;
let schema_version: String = latest.get(0);
Ok(schema_version)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn type_id_insert(
conn: &mut PoolConnection<Postgres>,
type_ids: Vec<TypeId>,
) -> sqlx::Result<usize> {
let mut builder = sqlx::QueryBuilder::new("INSERT INTO graph_registry_type_ids (id, schema_version, schema_name, schema_identifier, graphql_name, table_name)");
builder.push_values(type_ids, |mut b, tid| {
b.push_bind(tid.id)
.push_bind(tid.version)
.push_bind(tid.namespace)
.push_bind(tid.identifier)
.push_bind(tid.graphql_name)
.push_bind(tid.table_name);
});
let query = builder.build();
let result = query.execute(conn).await?;
Ok(result.rows_affected() as usize)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn schema_exists(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
version: &str,
) -> sqlx::Result<bool> {
let count = sqlx::query(
"SELECT COUNT(*) AS count FROM graph_registry_type_ids
WHERE schema_name = $1
AND schema_identifier = $2
AND schema_version = $3",
)
.bind(namespace)
.bind(identifier)
.bind(version)
.fetch_one(conn)
.await?;
let count: i64 = count.get(0);
Ok(count > 0)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn new_column_insert(
conn: &mut PoolConnection<Postgres>,
cols: Vec<Column>,
) -> sqlx::Result<usize> {
let mut builder = sqlx::QueryBuilder::new("INSERT INTO graph_registry_columns (type_id, column_position, column_name, column_type, nullable, graphql_type, is_unique, persistence)");
builder.push_values(cols, |mut b, new_col| {
b.push_bind(new_col.type_id)
.push_bind(new_col.position)
.push_bind(new_col.name)
.push_bind(new_col.coltype.to_string())
.push_bind(new_col.nullable)
.push_bind(new_col.graphql_type)
.push_bind(new_col.unique)
.push_bind(new_col.persistence.to_string());
});
let query = builder.build();
let result = query.execute(conn).await?;
Ok(result.rows_affected() as usize)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn list_column_by_id(
conn: &mut PoolConnection<Postgres>,
col_id: i64,
) -> sqlx::Result<Vec<Column>> {
Ok(
sqlx::query("SELECT * FROM graph_registry_columns WHERE type_id = $1")
.bind(col_id)
.fetch_all(conn)
.await?
.into_iter()
.map(|row| {
let id: i64 = row.get(0);
let type_id: i64 = row.get(1);
let position: i32 = row.get(2);
let name: String = row.get(3);
let coltype: String = row.get(4);
let nullable: bool = row.get(5);
let graphql_type: String = row.get(6);
let unique: bool = row.get(7);
let persistence: String = row.get(8);
let array_coltype: Option<String> = row.get(9);
Column {
id,
type_id,
position,
name,
coltype: ColumnType::from(coltype.as_str()),
nullable,
graphql_type,
unique,
persistence: Persistence::from_str(persistence.as_str())
.expect("Bad persistence."),
array_coltype: array_coltype.map(|t| ColumnType::from(t.as_str())),
}
})
.collect::<Vec<Column>>(),
)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn columns_get_schema(
conn: &mut PoolConnection<Postgres>,
name: &str,
identifier: &str,
version: &str,
) -> sqlx::Result<Vec<ColumnInfo>> {
Ok(sqlx::query(
"SELECT
c.type_id as type_id,
t.table_name as table_name,
c.column_position as column_position,
c.column_name as column_name,
c.column_type as column_type
FROM graph_registry_type_ids as t
INNER JOIN graph_registry_columns as c ON t.id = c.type_id
WHERE t.schema_name = $1
AND t.schema_identifier = $2
AND t.schema_version = $3
ORDER BY c.type_id, c.column_position",
)
.bind(name)
.bind(identifier)
.bind(version)
.fetch_all(conn)
.await?
.into_iter()
.map(|row: PgRow| {
let type_id: i64 = row.get(0);
let table_name: String = row.get(1);
let column_position: i32 = row.get(2);
let column_name: String = row.get(3);
let column_type: String = row.get(4);
ColumnInfo {
type_id,
table_name,
column_position,
column_name,
column_type,
}
})
.collect::<Vec<ColumnInfo>>())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn get_indexer(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<Option<RegisteredIndexer>> {
match sqlx::query(
"SELECT * FROM index_registry
WHERE namespace = $1
AND identifier = $2",
)
.bind(namespace)
.bind(identifier)
.fetch_optional(conn)
.await?
{
Some(row) => {
let created_at: DateTime<Utc> = {
let created_at: NaiveDateTime = row.get(4);
DateTime::<Utc>::from_naive_utc_and_offset(created_at, Utc)
};
Ok(Some(RegisteredIndexer {
id: row.get(0),
namespace: row.get(1),
identifier: row.get(2),
pubkey: row.get(3),
created_at,
}))
}
None => Ok(None),
}
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn register_indexer(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
pubkey: Option<&str>,
created_at: DateTime<Utc>,
) -> sqlx::Result<RegisteredIndexer> {
if let Some(index) = get_indexer(conn, namespace, identifier).await? {
return Ok(index);
}
let row = sqlx::query(
"INSERT INTO index_registry (namespace, identifier, pubkey, created_at)
VALUES ($1, $2, $3, $4)
RETURNING *",
)
.bind(namespace)
.bind(identifier)
.bind(pubkey)
.bind(created_at)
.fetch_one(conn)
.await?;
let id: i64 = row.get(0);
let namespace: String = row.get(1);
let identifier: String = row.get(2);
let pubkey = row.get(3);
let created_at: DateTime<Utc> = {
let created_at: NaiveDateTime = row.get(4);
DateTime::<Utc>::from_naive_utc_and_offset(created_at, Utc)
};
Ok(RegisteredIndexer {
id,
namespace,
identifier,
pubkey,
created_at,
})
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn all_registered_indexers(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<Vec<RegisteredIndexer>> {
Ok(sqlx::query("SELECT * FROM index_registry")
.fetch_all(conn)
.await?
.into_iter()
.map(|row| {
let id: i64 = row.get(0);
let namespace: String = row.get(1);
let identifier: String = row.get(2);
let pubkey = row.get(3);
let created_at: DateTime<Utc> = {
let created_at: NaiveDateTime = row.get(4);
DateTime::<Utc>::from_naive_utc_and_offset(created_at, Utc)
};
RegisteredIndexer {
id,
namespace,
identifier,
pubkey,
created_at,
}
})
.collect::<Vec<RegisteredIndexer>>())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn register_indexer_asset(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
bytes: Vec<u8>,
asset_type: IndexerAssetType,
pubkey: Option<&str>,
) -> sqlx::Result<IndexerAsset> {
let index = match get_indexer(conn, namespace, identifier).await? {
Some(index) => index,
None => {
let created_at = DateTime::<Utc>::from(SystemTime::now());
register_indexer(conn, namespace, identifier, pubkey, created_at).await?
}
};
let digest = sha256_digest(&bytes);
if let Some(asset) =
asset_already_exists(conn, &asset_type, &bytes, &index.id).await?
{
info!(
"Asset({asset_type:?}) for Indexer({}) already registered.",
index.uid()
);
return Ok(asset);
}
let query = format!(
"INSERT INTO index_asset_registry_{} (index_id, bytes, digest) VALUES ({}, $1, '{digest}') RETURNING *",
asset_type.as_ref(),
index.id,
);
let row = sqlx::QueryBuilder::new(query)
.build()
.bind(bytes)
.fetch_one(conn)
.await?;
info!(
"Registered Asset({:?}) with Version({}) to Indexer({}).",
asset_type,
digest,
index.uid()
);
let id = row.get(0);
let index_id = row.get(1);
let digest = row.get(2);
let bytes = row.get(3);
Ok(IndexerAsset {
id,
index_id,
digest,
bytes,
})
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn indexer_asset(
conn: &mut PoolConnection<Postgres>,
index_id: &i64,
asset_type: IndexerAssetType,
) -> sqlx::Result<IndexerAsset> {
let query = format!(
"SELECT * FROM index_asset_registry_{} WHERE index_id = {} ORDER BY id DESC LIMIT 1",
asset_type.as_ref(),
index_id
);
let row = sqlx::query(&query).fetch_one(conn).await?;
let id = row.get(0);
let index_id = row.get(1);
let digest = row.get(2);
let bytes = row.get(3);
Ok(IndexerAsset {
id,
index_id,
digest,
bytes,
})
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn indexer_assets(
conn: &mut PoolConnection<Postgres>,
indexer_id: &i64,
) -> sqlx::Result<IndexerAssetBundle> {
let wasm = indexer_asset(conn, indexer_id, IndexerAssetType::Wasm).await?;
let schema = indexer_asset(conn, indexer_id, IndexerAssetType::Schema).await?;
let manifest = indexer_asset(conn, indexer_id, IndexerAssetType::Manifest).await?;
Ok(IndexerAssetBundle {
wasm,
schema,
manifest,
})
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn last_block_height_for_indexer(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<u32> {
let query = format!(
"SELECT MAX(block_height) FROM {namespace}_{identifier}.indexmetadataentity LIMIT 1"
);
let row = sqlx::query(&query).fetch_one(conn).await?;
Ok(row
.try_get::<i32, usize>(0)
.map(|id| id.to_u32().expect("Bad block height."))
.unwrap_or(0))
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn asset_already_exists(
conn: &mut PoolConnection<Postgres>,
asset_type: &IndexerAssetType,
bytes: &Vec<u8>,
index_id: &i64,
) -> sqlx::Result<Option<IndexerAsset>> {
let digest = sha256_digest(bytes);
let query = format!(
"SELECT * FROM index_asset_registry_{} WHERE index_id = {} AND digest = '{}'",
asset_type.as_ref(),
index_id,
digest
);
match sqlx::QueryBuilder::new(query).build().fetch_one(conn).await {
Ok(row) => {
let id = row.get(0);
let index_id = row.get(1);
let digest = row.get(2);
let bytes = row.get(3);
Ok(Some(IndexerAsset {
id,
index_id,
digest,
bytes,
}))
}
Err(_e) => Ok(None),
}
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn get_indexer_id(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<i64> {
let row = sqlx::query(
"SELECT id FROM index_registry
WHERE namespace = $1
AND identifier = $2",
)
.bind(namespace)
.bind(identifier)
.fetch_one(conn)
.await?;
let id: i64 = row.get(0);
Ok(id)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn start_transaction(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<usize> {
execute_query(conn, "BEGIN".into()).await
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn commit_transaction(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<usize> {
execute_query(conn, "COMMIT".into()).await
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn revert_transaction(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<usize> {
execute_query(conn, "ROLLBACK".into()).await
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn remove_indexer(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
remove_data: bool,
) -> sqlx::Result<()> {
execute_query(
conn,
format!(
"DELETE FROM index_asset_registry_wasm WHERE index_id IN
(SELECT id FROM index_registry
WHERE namespace = '{namespace}' AND identifier = '{identifier}')"
),
)
.await?;
execute_query(
conn,
format!(
"DELETE FROM index_asset_registry_manifest WHERE index_id IN
(SELECT id FROM index_registry
WHERE namespace = '{namespace}' AND identifier = '{identifier}')"
),
)
.await?;
execute_query(
conn,
format!(
"DELETE FROM index_registry WHERE id IN
(SELECT id FROM index_registry
WHERE namespace = '{namespace}' AND identifier = '{identifier}')"
),
)
.await?;
execute_query(
conn,
format!(
"DELETE FROM index_asset_registry_schema WHERE index_id IN
(SELECT id FROM index_registry
WHERE namespace = '{namespace}' AND identifier = '{identifier}')"
),
)
.await?;
if remove_data {
execute_query(
conn,
format!(
"DELETE FROM graph_registry_columns WHERE type_id IN (SELECT id FROM graph_registry_type_ids WHERE schema_name = '{namespace}' AND schema_identifier = '{identifier}');"
),
)
.await?;
execute_query(
conn,
format!(
"DELETE FROM graph_registry_type_ids WHERE schema_name = '{namespace}' AND schema_identifier = '{identifier}';"
),
)
.await?;
execute_query(
conn,
format!(
"DELETE FROM graph_registry_root_columns WHERE root_id = (SELECT id FROM graph_registry_graph_root WHERE schema_name = '{namespace}' AND schema_identifier = '{identifier}');"
),
)
.await?;
execute_query(
conn,
format!(
"DELETE FROM graph_registry_graph_root WHERE schema_name = '{namespace}' AND schema_identifier = '{identifier}';"
),
)
.await?;
execute_query(
conn,
format!("DROP SCHEMA IF EXISTS {namespace}_{identifier} CASCADE"),
)
.await?;
}
Ok(())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn create_nonce(conn: &mut PoolConnection<Postgres>) -> sqlx::Result<Nonce> {
let uid = uuid::Uuid::new_v4().as_simple().to_string();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let expiry = now + NONCE_EXPIRY_SECS;
let row = sqlx::QueryBuilder::new(&format!(
"INSERT INTO nonce (uid, expiry) VALUES ('{uid}', {expiry}) RETURNING *"
))
.build()
.fetch_one(conn)
.await?;
let uid: String = row.get(1);
let expiry: i64 = row.get(2);
Ok(Nonce { uid, expiry })
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn get_nonce(
conn: &mut PoolConnection<Postgres>,
uid: &str,
) -> sqlx::Result<Nonce> {
let row = sqlx::query(&format!("SELECT * FROM nonce WHERE uid = '{uid}'"))
.fetch_one(conn)
.await?;
let uid: String = row.get(1);
let expiry: i64 = row.get(2);
Ok(Nonce { uid, expiry })
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn delete_nonce(
conn: &mut PoolConnection<Postgres>,
nonce: &Nonce,
) -> sqlx::Result<()> {
let _ = sqlx::query(&format!("DELETE FROM nonce WHERE uid = '{}'", nonce.uid))
.execute(conn)
.await?;
Ok(())
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn indexer_owned_by(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
pubkey: &str,
) -> sqlx::Result<()> {
let row = sqlx::query(&format!("SELECT COUNT(*)::int FROM index_registry WHERE namespace = '{namespace}' AND identifier = '{identifier}' AND pubkey = '{pubkey}'"))
.fetch_one(conn)
.await?;
let count = row.get::<i32, usize>(0);
if count == 1 {
return Ok(());
}
Err(sqlx::Error::RowNotFound)
}
#[cfg_attr(feature = "metrics", metrics)]
pub async fn put_many_to_many_record(
conn: &mut PoolConnection<Postgres>,
query: String,
) -> sqlx::Result<()> {
execute_query(conn, query).await?;
Ok(())
}
pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<()> {
let trigger_function = "CREATE OR REPLACE FUNCTION ensure_block_height_consecutive()
RETURNS TRIGGER AS $$
DECLARE
block_height integer;
BEGIN
EXECUTE format('SELECT MAX(block_height) FROM %I.%I', TG_TABLE_SCHEMA, TG_TABLE_NAME) INTO block_height;
IF NEW.block_height IS NOT NULL AND block_height IS NOT NULL AND NEW.block_height != block_height + 1 THEN
RAISE EXCEPTION '%.%: attempted to insert value with block_height = % while last indexed block_height = %. block_height values must be consecutive.', TG_TABLE_SCHEMA, TG_TABLE_NAME, NEW.block_height, block_height;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;".to_string();
execute_query(conn, trigger_function).await.unwrap();
let trigger = format!(
"DO
$$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_trigger
WHERE tgname = 'trigger_ensure_block_height_consecutive'
AND tgrelid = '{namespace}_{identifier}.indexmetadataentity'::regclass
) THEN
CREATE TRIGGER trigger_ensure_block_height_consecutive
BEFORE INSERT OR UPDATE ON {namespace}_{identifier}.indexmetadataentity
FOR EACH ROW
EXECUTE FUNCTION ensure_block_height_consecutive();
END IF;
END;
$$;"
);
execute_query(conn, trigger).await?;
Ok(())
}
pub async fn remove_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
) -> sqlx::Result<()> {
let trigger = format!(
"DROP TRIGGER IF EXISTS trigger_ensure_block_height_consecutive ON {namespace}_{identifier}.indexmetadataentity;"
);
execute_query(conn, trigger).await?;
Ok(())
}
pub async fn set_indexer_status(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
status: IndexerStatus,
) -> sqlx::Result<()> {
let indexer_id = get_indexer_id(conn, namespace, identifier).await?;
sqlx::query(
"INSERT INTO index_status (indexer_id, status, status_message)
VALUES ($1, $2, $3)
ON CONFLICT (indexer_id) DO UPDATE
SET status = EXCLUDED.status, status_message = EXCLUDED.status_message;",
)
.bind(indexer_id)
.bind(status.status_kind.to_string())
.bind(status.status_message)
.execute(conn)
.await?;
Ok(())
}
pub async fn all_registered_indexer_statuses(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<HashMap<(String, String), IndexerStatus>> {
let rows = sqlx::query(
"SELECT index_registry.namespace, index_registry.identifier, status, status_message
FROM index_status
INNER JOIN index_registry
ON index_status.indexer_id = index_registry.id;"
)
.fetch_all(conn)
.await?;
let mut result = HashMap::new();
for row in rows {
let namespace: String = row.get(0);
let identifier: String = row.get(1);
let status_kind =
IndexerStatusKind::from_str(row.get(2)).unwrap_or(IndexerStatusKind::Unknown);
let status_message: String = row.get(3);
let status = IndexerStatus {
status_kind,
status_message,
};
result.insert((namespace, identifier), status);
}
Ok(result)
}