use crate::{
executor::WasmIndexExecutor, Executor, IndexerConfig, IndexerError, IndexerResult,
Manifest,
};
use anyhow::Context;
use async_std::sync::Arc;
use async_std::{fs::File, io::ReadExt};
use fuel_indexer_database::{
queries,
types::{IndexerAssetType, IndexerStatus},
IndexerConnection, IndexerConnectionPool,
};
use fuel_indexer_lib::utils::ServiceRequest;
use fuel_indexer_schema::db::manager::SchemaManager;
use std::collections::HashMap;
use std::marker::Send;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc::Receiver;
use tracing::{error, info, warn};
pub struct IndexerService {
config: IndexerConfig,
pool: IndexerConnectionPool,
manager: SchemaManager,
tasks: tokio::task::JoinSet<anyhow::Result<()>>,
rx: Receiver<ServiceRequest>,
killers: HashMap<String, Arc<AtomicBool>>,
}
impl IndexerService {
pub async fn new(
config: IndexerConfig,
pool: IndexerConnectionPool,
rx: Receiver<ServiceRequest>,
) -> IndexerResult<IndexerService> {
let manager = SchemaManager::new(pool.clone());
Ok(IndexerService {
config,
pool,
manager,
killers: HashMap::default(),
tasks: tokio::task::JoinSet::new(),
rx,
})
}
pub async fn register_indexer_from_manifest(
&mut self,
mut manifest: Manifest,
remove_data: bool,
) -> IndexerResult<()> {
if let Some(killer) = self.killers.get(&manifest.uid()) {
killer.store(true, std::sync::atomic::Ordering::SeqCst);
}
let mut conn = self.pool.acquire().await?;
let indexer_exists = (queries::get_indexer_id(
&mut conn,
manifest.namespace(),
manifest.identifier(),
)
.await)
.is_ok();
if indexer_exists {
if !self.config.replace_indexer {
return Err(anyhow::anyhow!(
"Indexer({}.{}) already exists.",
manifest.namespace(),
manifest.identifier()
)
.into());
} else if let Err(e) = queries::remove_indexer(
&mut conn,
manifest.namespace(),
manifest.identifier(),
remove_data,
)
.await
{
error!(
"Failed to remove Indexer({}.{}): {e}",
manifest.namespace(),
manifest.identifier()
);
queries::revert_transaction(&mut conn).await?;
return Err(e.into());
}
}
let _indexer = queries::register_indexer(
&mut conn,
manifest.namespace(),
manifest.identifier(),
None,
)
.await?;
let schema = manifest.graphql_schema_content()?;
let schema_version = schema.version().to_string();
let schema_bytes = Vec::<u8>::from(&schema);
self.manager
.new_schema(
manifest.namespace(),
manifest.identifier(),
schema,
&mut conn,
)
.await?;
let start_block = get_start_block(&mut conn, &manifest).await?;
manifest.set_start_block(start_block);
let wasm_bytes = match manifest.module() {
crate::Module::Wasm(ref module) => {
let mut bytes = Vec::<u8>::new();
let mut file = File::open(module).await?;
file.read_to_end(&mut bytes).await?;
bytes
}
};
let executor = WasmIndexExecutor::create(
&self.config,
&manifest,
self.pool.clone(),
schema_version,
wasm_bytes.clone(),
)
.await?;
let mut items = vec![
(IndexerAssetType::Wasm, wasm_bytes),
(IndexerAssetType::Manifest, manifest.clone().into()),
(IndexerAssetType::Schema, schema_bytes),
];
while let Some((asset_type, bytes)) = items.pop() {
info!(
"Registering Asset({asset_type:?}) for Indexer({})",
manifest.uid()
);
{
queries::register_indexer_asset(
&mut conn,
manifest.namespace(),
manifest.identifier(),
bytes,
asset_type,
None,
)
.await?;
}
}
info!(
"Registered Indexer({}.{})",
manifest.namespace(),
manifest.identifier()
);
self.start_executor(executor).await?;
Ok(())
}
pub async fn register_indexers_from_registry(&mut self) -> IndexerResult<()> {
let mut conn = self.pool.acquire().await?;
let indices = queries::all_registered_indexers(&mut conn).await?;
for index in indices {
let assets = queries::indexer_assets(&mut conn, &index.id).await?;
let mut manifest = Manifest::try_from(&assets.manifest.bytes)?;
let start_block = get_start_block(&mut conn, &manifest).await.unwrap_or(1);
manifest.set_start_block(start_block);
if let Ok(executor) = WasmIndexExecutor::create(
&self.config,
&manifest,
self.pool.clone(),
assets.schema.digest,
assets.wasm.bytes,
)
.await
{
info!("Registered Indexer({})", manifest.uid());
self.start_executor(executor).await?;
} else {
error!(
"Failed to register Indexer({}) from registry.",
manifest.uid()
);
}
}
Ok(())
}
pub async fn run(mut self) -> IndexerResult<()> {
loop {
tokio::select! {
Some(Err(e)) = self.tasks.join_next() => {
error!("Error retiring indexer task {e}");
}
Some(service_request) = self.rx.recv() => {
match service_request {
ServiceRequest::Reload(request) => {
let mut conn = self.pool.acquire().await?;
match queries::get_indexer_id(
&mut conn,
&request.namespace,
&request.identifier,
)
.await
{
Ok(id) => {
let assets =
queries::indexer_assets(&mut conn, &id)
.await?;
let mut manifest =
Manifest::try_from(&assets.manifest.bytes)?;
let start_block =
get_start_block(&mut conn, &manifest).await?;
manifest.set_start_block(start_block);
if let Some(killer_for_prev_executor) =
self.killers.remove(&manifest.uid())
{
let uid = manifest.uid();
info!("Indexer({uid}) is being replaced. Stopping previous version of Indexer({uid}).");
killer_for_prev_executor
.store(true, Ordering::SeqCst);
}
match WasmIndexExecutor::create(
&self.config,
&manifest,
self.pool.clone(),
assets.schema.digest,
assets.wasm.bytes,
)
.await
{
Ok(executor) => self.start_executor(executor).await?,
Err(e) => {
error!(
"Failed to reload Indexer({}.{}): {e:?}",
&request.namespace, &request.identifier
);
return Ok(());
}
}
}
Err(e) => {
error!(
"Failed to find Indexer({}.{}): {}",
&request.namespace, &request.identifier, e
);
continue;
}
}
}
ServiceRequest::Stop(request) => {
let uid = format!("{}.{}", request.namespace, request.identifier);
if let Some(killer) = self.killers.remove(&uid) {
killer.store(true, Ordering::SeqCst);
} else {
warn!(
"Stop Indexer: No indexer with the name Indexer({uid})"
);
}
}
}
}
}
}
}
#[allow(clippy::result_large_err)]
async fn start_executor<T: 'static + Executor + Send + Sync>(
&mut self,
executor: T,
) -> anyhow::Result<()> {
let uid = executor.manifest().uid();
let namespace = executor.manifest().namespace().to_string();
let identifier = executor.manifest().identifier().to_string();
let mut conn = self.pool.acquire().await?;
self.killers
.insert(uid.clone(), executor.kill_switch().clone());
let task =
crate::executor::run_executor(&self.config, self.pool.clone(), executor)?;
self.tasks.spawn(async move {
queries::set_indexer_status(
&mut conn,
&namespace,
&identifier,
IndexerStatus::starting(),
)
.await
.with_context(|| {
format!("Failed to set Indexer({namespace}.{identifier}) status.")
})?;
let result = task.await;
let status: IndexerStatus;
if let Err(ref e) = result {
match e {
IndexerError::KillSwitch | IndexerError::EndBlockMet => {
info!("Indexer({namespace}.{identifier}) terminated: {e}");
status = IndexerStatus::stopped(e.to_string())
}
_ => {
error!("Indexer({namespace}.{identifier}) terminated with an error: {e}");
status = IndexerStatus::error(e.to_string())
}
};
} else {
info!("Indexer({namespace}.{identifier}) stopped.");
status = IndexerStatus::stopped("".to_string());
}
queries::set_indexer_status(&mut conn, &namespace, &identifier, status)
.await
.with_context(|| {
format!("Failed to set Indexer({namespace}.{identifier}) status.")
})?;
result.map_err(Into::into)
});
Ok(())
}
}
pub async fn get_start_block(
conn: &mut IndexerConnection,
manifest: &Manifest,
) -> Result<u32, IndexerError> {
match &manifest.resumable() {
Some(resumable) => {
let last = queries::last_block_height_for_indexer(
conn,
manifest.namespace(),
manifest.identifier(),
)
.await?;
let start = manifest.start_block().unwrap_or(last);
let block = if *resumable {
std::cmp::max(start, last + 1)
} else {
start
};
let action = if *resumable { "Resuming" } else { "Starting" };
info!("{action} Indexer({}) from block {block}", manifest.uid());
Ok(block)
}
None => {
let block = manifest.start_block().unwrap_or(1);
info!("Starting Indexer({}) from block {block}", manifest.uid());
Ok(block)
}
}
}