use crate::{
database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError,
IndexerResult,
};
use anyhow::Context;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use fuel_core_client::client::{
pagination::{PageDirection, PaginatedResult, PaginationRequest},
schema::block::{Consensus as ClientConsensus, Genesis as ClientGenesis},
types::TransactionStatus as ClientTransactionStatus,
FuelClient,
};
use fuel_indexer_database::{queries, types::IndexerStatus, IndexerConnectionPool};
use fuel_indexer_lib::{
defaults::*, manifest::Manifest, utils::serialize, WasmIndexerError,
};
#[cfg(feature = "metrics")]
use fuel_indexer_metrics::METRICS;
use fuel_indexer_types::{
fuel::{field::*, *},
scalar::{Bytes, Bytes32},
};
use fuel_tx::UniqueIdentifier;
use fuel_vm::prelude::Deserializable;
use fuel_vm::state::ProgramState as ClientProgramState;
use futures::Future;
use itertools::Itertools;
use std::{
marker::{Send, Sync},
path::Path,
str::FromStr,
sync::atomic::{AtomicBool, Ordering},
};
use tokio::{
task::spawn_blocking,
time::{sleep, Duration},
};
use tracing::{debug, error, info, warn};
use wasmer::{
imports, CompilerConfig, Cranelift, FunctionEnv, Instance, Memory, Module, Store,
TypedFunction,
};
use wasmer_middlewares::metering::MeteringPoints;
#[cfg(feature = "metrics")]
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub enum ExecutorSource {
Manifest,
Registry(Vec<u8>),
}
impl AsRef<[u8]> for ExecutorSource {
fn as_ref(&self) -> &[u8] {
match self {
ExecutorSource::Manifest => &[],
ExecutorSource::Registry(b) => b,
}
}
}
impl From<ExecutorSource> for Vec<u8> {
fn from(source: ExecutorSource) -> Self {
match source {
ExecutorSource::Manifest => vec![],
ExecutorSource::Registry(bytes) => bytes,
}
}
}
pub fn run_executor<T: 'static + Executor + Send + Sync>(
config: &IndexerConfig,
pool: IndexerConnectionPool,
mut executor: T,
) -> anyhow::Result<impl Future<Output = IndexerResult<()>>> {
let end_block = executor.manifest().end_block();
let stop_idle_indexers = config.stop_idle_indexers;
let indexer_uid = executor.manifest().uid();
let block_page_size = config.block_page_size;
let fuel_node_addr = executor
.manifest()
.fuel_client()
.map(|x| x.to_string())
.unwrap_or(config.fuel_node.to_string());
let mut cursor = executor.manifest().start_block().map(|x| {
if x > 1 {
let decremented = x - 1;
decremented.to_string()
} else {
"0".to_string()
}
});
info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}");
let client = FuelClient::from_str(&fuel_node_addr)
.with_context(|| "Client node connection failed".to_string())?;
if let Some(end_block) = end_block {
info!("Indexer({indexer_uid}) will stop at block #{end_block}.");
} else {
warn!("No end_block specified in the manifest. Indexer({indexer_uid}) will run forever.");
}
let allow_non_sequential_blocks = config.allow_non_sequential_blocks;
let client_request_delay = config.client_request_delay;
let task = async move {
let mut conn = pool
.acquire()
.await
.with_context(|| "Unable to acquire a database connection".to_string())?;
if allow_non_sequential_blocks {
queries::remove_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.with_context(|| {
"Unable to remove the sequential blocks trigger".to_string()
})?;
} else {
queries::create_ensure_block_height_consecutive_trigger(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
)
.await
.with_context(|| {
"Unable to create the sequential blocks trigger".to_string()
})?;
}
let mut consecutive_retries = 0;
let max_empty_block_reqs = if stop_idle_indexers {
MAX_CONSECUTIVE_EMPTY_BLOCK_RESPONSES
} else {
usize::MAX
};
let mut num_empty_block_reqs = 0;
loop {
if executor.kill_switch().load(Ordering::SeqCst) {
return Err(IndexerError::KillSwitch);
}
#[cfg(feature = "metrics")]
let start = Instant::now();
let (block_info, next_cursor, _has_next_page) =
match retrieve_blocks_from_node(
&client,
block_page_size,
&cursor,
end_block,
&indexer_uid,
)
.await
{
Ok((block_info, next_cursor, has_next_page)) => {
(block_info, next_cursor, has_next_page)
}
Err(e) => {
if let IndexerError::EndBlockMet = e {
info!("Indexer({indexer_uid}) has met its end block; beginning indexer shutdown process.");
executor.kill_switch().store(true, Ordering::SeqCst);
continue;
} else {
error!(
"Indexer({indexer_uid}) failed to fetch blocks: {e:?}",
);
sleep(Duration::from_secs(DELAY_FOR_SERVICE_ERROR)).await;
continue;
}
}
};
#[cfg(feature = "metrics")]
{
METRICS
.exec
.web
.record(&indexer_uid, start.elapsed().as_millis() as f64);
}
if block_info.is_empty() {
num_empty_block_reqs += 1;
info!(
"Indexer({indexer_uid}) has no new blocks to process, sleeping zzZZ. (Empty response #{num_empty_block_reqs})"
);
if num_empty_block_reqs == max_empty_block_reqs {
return Err(anyhow::format_err!(
"No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"
).into());
}
sleep(Duration::from_secs(IDLE_SERVICE_WAIT_SECS)).await;
continue;
}
let result = executor.handle_events(block_info).await;
if executor.kill_switch().load(Ordering::SeqCst) {
return Err(IndexerError::KillSwitch);
}
if let Err(e) = result {
if let IndexerError::RuntimeError(ref e) = e {
match e.downcast_ref::<WasmIndexerError>() {
Some(&WasmIndexerError::MissingBlocksError) => {
return Err(anyhow::anyhow!("{e}").into());
}
Some(&WasmIndexerError::Panic)
| Some(&WasmIndexerError::GeneralError) => {
let message = executor.get_error_message().await.unwrap_or(
"unable to extract the error message".to_string(),
);
return Err(anyhow::anyhow!("{message}").into());
}
_ => (),
}
}
if let IndexerError::RunTimeLimitExceededError = e {
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"
).into());
}
if consecutive_retries >= INDEXER_FAILED_CALLS {
return Err(anyhow::format_err!(
"Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>"
).into());
}
if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e {
if inner.constraint().is_some() {
warn!("Constraint violation: {inner:?}. This is not a retry-able error. Continuing...");
cursor = next_cursor;
continue;
}
}
warn!("Indexer({indexer_uid}) retrying handler after {consecutive_retries}/{INDEXER_FAILED_CALLS} failed attempts.");
consecutive_retries += 1;
continue;
}
num_empty_block_reqs = 0;
cursor = next_cursor;
queries::set_indexer_status(
&mut conn,
executor.manifest().namespace(),
executor.manifest().identifier(),
IndexerStatus::running(format!(
"Indexed {} blocks",
cursor.clone().unwrap_or("0".to_string())
)),
)
.await?;
if executor.kill_switch().load(Ordering::SeqCst) {
return Err(IndexerError::KillSwitch);
}
consecutive_retries = 0;
if let Some(delay) = client_request_delay {
sleep(Duration::from_secs(delay)).await;
}
}
};
Ok(task)
}
pub async fn retrieve_blocks_from_node(
client: &FuelClient,
block_page_size: usize,
cursor: &Option<String>,
end_block: Option<u32>,
indexer_uid: &str,
) -> IndexerResult<(Vec<BlockData>, Option<String>, bool)> {
let page_size = if let (Some(start), Some(end)) = (cursor, end_block) {
if let Ok(start) = start.parse::<u32>() {
if start >= end {
return Err(IndexerError::EndBlockMet);
}
std::cmp::min((end - start) as usize, block_page_size)
} else {
block_page_size
}
} else {
block_page_size
};
debug!("Fetching paginated results from {cursor:?}");
let PaginatedResult {
cursor,
results,
has_next_page,
..
} = client
.full_blocks(PaginationRequest {
cursor: cursor.clone(),
results: page_size,
direction: PageDirection::Forward,
})
.await
.unwrap_or_else(|e| {
error!("Indexer({indexer_uid}) failed to retrieve blocks: {e:?}");
PaginatedResult {
cursor: None,
results: vec![],
has_next_page: false,
has_previous_page: false,
}
});
let chain_id = client.chain_info().await?.consensus_parameters.chain_id;
let mut block_info = Vec::new();
for block in results.into_iter() {
let producer: Option<Bytes32> = block.block_producer().map(|pk| pk.hash());
let mut transactions = Vec::new();
for trans in block.transactions {
let receipts = trans
.receipts
.unwrap_or_default()
.into_iter()
.map(TryInto::try_into)
.try_collect()
.expect("Bad receipts.");
let status = trans.status.expect("Bad transaction status.");
let status = match status.try_into().unwrap() {
ClientTransactionStatus::Success {
block_id,
time,
program_state,
} => {
let program_state = program_state.map(|p| match p {
ClientProgramState::Return(w) => ProgramState {
return_type: ReturnType::Return,
data: Bytes::from(w.to_le_bytes().to_vec()),
},
ClientProgramState::ReturnData(d) => ProgramState {
return_type: ReturnType::ReturnData,
data: Bytes::from(d.to_vec()),
},
ClientProgramState::Revert(w) => ProgramState {
return_type: ReturnType::Revert,
data: Bytes::from(w.to_le_bytes().to_vec()),
},
#[allow(unreachable_patterns)]
_ => unreachable!("Bad program state."),
});
TransactionStatus::Success {
block: block_id.parse().expect("Bad block height."),
time: time.to_unix() as u64,
program_state,
}
}
ClientTransactionStatus::Failure {
block_id,
time,
reason,
program_state,
} => {
let program_state = program_state.map(|p| match p {
ClientProgramState::Return(w) => ProgramState {
return_type: ReturnType::Return,
data: Bytes::from(w.to_le_bytes().to_vec()),
},
ClientProgramState::ReturnData(d) => ProgramState {
return_type: ReturnType::ReturnData,
data: Bytes::from(d.to_vec()),
},
ClientProgramState::Revert(w) => ProgramState {
return_type: ReturnType::Revert,
data: Bytes::from(w.to_le_bytes().to_vec()),
},
#[allow(unreachable_patterns)]
_ => unreachable!("Bad program state."),
});
TransactionStatus::Failure {
block: block_id.parse().expect("Bad block ID."),
time: time.to_unix() as u64,
program_state,
reason,
}
}
ClientTransactionStatus::Submitted { submitted_at } => {
TransactionStatus::Submitted {
submitted_at: submitted_at.to_unix() as u64,
}
}
ClientTransactionStatus::SqueezedOut { reason } => {
TransactionStatus::SqueezedOut { reason }
}
};
let transaction: fuel_tx::Transaction =
fuel_tx::Transaction::from_bytes(trans.raw_payload.0 .0.as_slice())
.expect("Bad transaction.");
let id = transaction.id(&chain_id);
let transaction = match transaction {
ClientTransaction::Create(tx) => Transaction::Create(Create {
gas_price: *tx.gas_price(),
gas_limit: *tx.gas_limit(),
maturity: *tx.maturity(),
bytecode_length: *tx.bytecode_length(),
bytecode_witness_index: *tx.bytecode_witness_index(),
storage_slots: tx
.storage_slots()
.iter()
.map(|x| StorageSlot {
key: <[u8; 32]>::from(*x.key()).into(),
value: <[u8; 32]>::from(*x.value()).into(),
})
.collect(),
inputs: tx.inputs().iter().map(|i| i.to_owned().into()).collect(),
outputs: tx.outputs().iter().map(|o| o.to_owned().into()).collect(),
witnesses: tx.witnesses().to_vec(),
salt: <[u8; 32]>::from(*tx.salt()).into(),
metadata: None,
}),
ClientTransaction::Script(tx) => Transaction::Script(Script {
gas_price: *tx.gas_price(),
gas_limit: *tx.gas_limit(),
maturity: *tx.maturity(),
script: (*tx.script().clone()).to_vec(),
script_data: (*tx.script_data().clone()).to_vec(),
inputs: tx.inputs().iter().map(|i| i.to_owned().into()).collect(),
outputs: tx.outputs().iter().map(|o| o.to_owned().into()).collect(),
witnesses: tx.witnesses().to_vec(),
receipts_root: <[u8; 32]>::from(*tx.receipts_root()).into(),
metadata: None,
}),
ClientTransaction::Mint(tx) => Transaction::Mint(Mint {
tx_pointer: tx.tx_pointer().to_owned().into(),
outputs: tx.outputs().iter().map(|o| o.to_owned().into()).collect(),
metadata: None,
}),
};
let tx_data = TransactionData {
receipts,
status,
transaction,
id,
};
transactions.push(tx_data);
}
let consensus = match &block.consensus {
ClientConsensus::Unknown => Consensus::Unknown,
ClientConsensus::Genesis(g) => {
let ClientGenesis {
chain_config_hash,
coins_root,
contracts_root,
messages_root,
} = g.to_owned();
Consensus::Genesis(Genesis {
chain_config_hash: <[u8; 32]>::from(
chain_config_hash.to_owned().0 .0,
)
.into(),
coins_root: <[u8; 32]>::from(coins_root.0 .0.to_owned()).into(),
contracts_root: <[u8; 32]>::from(contracts_root.0 .0.to_owned())
.into(),
messages_root: <[u8; 32]>::from(messages_root.0 .0.to_owned()).into(),
})
}
ClientConsensus::PoAConsensus(poa) => Consensus::PoA(PoA {
signature: <[u8; 64]>::from(poa.signature.0 .0.to_owned()).into(),
}),
};
let block = BlockData {
height: block.header.height.clone().into(),
id: Bytes32::from(<[u8; 32]>::from(block.id.0 .0)),
producer,
time: block.header.time.0.to_unix(),
consensus,
header: Header {
id: block.header.id.into(),
da_height: block.header.da_height.0,
transactions_count: block.header.transactions_count.into(),
message_receipt_count: block.header.message_receipt_count.into(),
transactions_root: block.header.transactions_root.into(),
message_receipt_root: block.header.message_receipt_root.into(),
height: block.header.height.into(),
prev_root: block.header.prev_root.into(),
time: block.header.time.0.to_unix(),
application_hash: block.header.application_hash.into(),
},
transactions,
};
block_info.push(block);
}
Ok((block_info, cursor, has_next_page))
}
#[async_trait]
pub trait Executor
where
Self: Sized,
{
async fn handle_events(&mut self, blocks: Vec<BlockData>) -> IndexerResult<()>;
fn manifest(&self) -> &Manifest;
fn kill_switch(&self) -> &Arc<AtomicBool>;
async fn get_error_message(&self) -> IndexerResult<String>;
}
#[derive(Clone)]
pub struct IndexEnv {
pub memory: Option<Memory>,
pub alloc: Option<TypedFunction<u32, u32>>,
pub dealloc: Option<TypedFunction<(u32, u32), ()>>,
pub db: Arc<Mutex<Database>>,
pub kill_switch: Arc<AtomicBool>,
}
impl IndexEnv {
pub async fn new(
pool: IndexerConnectionPool,
manifest: &Manifest,
config: &IndexerConfig,
kill_switch: Arc<AtomicBool>,
) -> IndexerResult<IndexEnv> {
let db = Database::new(pool, manifest, config).await;
Ok(IndexEnv {
memory: None,
alloc: None,
dealloc: None,
db: Arc::new(Mutex::new(db)),
kill_switch,
})
}
}
#[derive(Debug)]
pub struct WasmIndexExecutor {
instance: Instance,
_module: Module,
store: Arc<Mutex<Store>>,
db: Arc<Mutex<Database>>,
metering_points: Option<u64>,
manifest: Manifest,
kill_switch: Arc<AtomicBool>,
}
impl WasmIndexExecutor {
pub async fn new(
config: &IndexerConfig,
manifest: &Manifest,
wasm_bytes: impl AsRef<[u8]>,
pool: IndexerConnectionPool,
schema_version: String,
) -> IndexerResult<Self> {
let mut compiler_config = Cranelift::new();
if let Some(metering_points) = config.metering_points {
let metering =
Arc::new(wasmer_middlewares::Metering::new(metering_points, |_| 1));
compiler_config.push_middleware(metering);
}
let kill_switch = Arc::new(AtomicBool::new(false));
let idx_env = IndexEnv::new(pool, manifest, config, kill_switch.clone()).await?;
let db: Arc<Mutex<Database>> = idx_env.db.clone();
let mut store = Store::new(compiler_config);
let module = Module::new(&store, &wasm_bytes)?;
let env = FunctionEnv::new(&mut store, idx_env);
let mut imports = imports! {};
for (export_name, export) in ffi::get_exports(&mut store, &env) {
imports.define("env", &export_name, export.clone());
}
let instance = Instance::new(&mut store, &module, &imports)?;
if !instance
.exports
.contains(ffi::MODULE_ENTRYPOINT.to_string())
{
return Err(IndexerError::MissingHandler);
}
{
let schema_version_from_wasm = ffi::get_version(&mut store, &instance)?;
let mut env_mut = env.clone().into_mut(&mut store);
let (data_mut, store_mut) = env_mut.data_and_store_mut();
if schema_version_from_wasm != schema_version {
return Err(IndexerError::SchemaVersionMismatch(format!(
"Schema version from WASM {} does not match schema version from database {}",
schema_version_from_wasm, schema_version
)));
}
data_mut.memory = Some(instance.exports.get_memory("memory")?.clone());
data_mut.alloc = Some(
instance
.exports
.get_typed_function(&store_mut, "alloc_fn")?,
);
data_mut.dealloc = Some(
instance
.exports
.get_typed_function(&store_mut, "dealloc_fn")?,
);
}
db.lock().await.load_schema(schema_version).await?;
Ok(WasmIndexExecutor {
instance,
_module: module,
store: Arc::new(Mutex::new(store)),
db: db.clone(),
metering_points: config.metering_points,
manifest: manifest.clone(),
kill_switch,
})
}
pub async fn from_file(
p: impl AsRef<Path>,
config: Option<IndexerConfig>,
pool: IndexerConnectionPool,
) -> IndexerResult<WasmIndexExecutor> {
let config = config.unwrap_or_default();
let manifest = Manifest::from_file(p)?;
let bytes = manifest.module_bytes()?;
let schema_version = manifest.graphql_schema_content()?.version().to_string();
Self::new(&config, &manifest, bytes, pool, schema_version).await
}
pub async fn create(
config: &IndexerConfig,
manifest: &Manifest,
pool: IndexerConnectionPool,
schema_version: String,
wasm_bytes: impl AsRef<[u8]>,
) -> IndexerResult<Self> {
let uid = manifest.uid();
let mut conn = pool.acquire().await?;
queries::set_indexer_status(
&mut conn,
manifest.namespace(),
manifest.identifier(),
IndexerStatus::instantiating(),
)
.await?;
match WasmIndexExecutor::new(
config,
manifest,
wasm_bytes,
pool.clone(),
schema_version,
)
.await
{
Ok(executor) => Ok(executor),
Err(e) => {
error!("Could not instantiate WasmIndexExecutor({uid}): {e:?}.");
let mut conn = pool.acquire().await?;
queries::set_indexer_status(
&mut conn,
manifest.namespace(),
manifest.identifier(),
IndexerStatus::error(format!("{e}")),
)
.await?;
Err(IndexerError::WasmExecutionInstantiationError)
}
}
}
pub fn metering_enabled(&self) -> bool {
self.metering_points.is_some()
}
pub async fn metering_points_exhausted(&self) -> bool {
if self.metering_enabled() {
self.get_remaining_metering_points().await.unwrap()
== MeteringPoints::Exhausted
} else {
false
}
}
pub async fn get_remaining_metering_points(&self) -> Option<MeteringPoints> {
if self.metering_enabled() {
let mut store_guard = self.store.lock().await;
let result = wasmer_middlewares::metering::get_remaining_points(
&mut store_guard,
&self.instance,
);
Some(result)
} else {
None
}
}
pub async fn set_metering_points(&self, metering_points: u64) -> IndexerResult<()> {
if self.metering_enabled() {
let mut store_guard = self.store.lock().await;
wasmer_middlewares::metering::set_remaining_points(
&mut store_guard,
&self.instance,
metering_points,
);
Ok(())
} else {
Err(anyhow::anyhow!(
"Attempting to set metering points when metering is not enables"
.to_string(),
)
.into())
}
}
}
#[async_trait]
impl Executor for WasmIndexExecutor {
async fn handle_events(&mut self, blocks: Vec<BlockData>) -> IndexerResult<()> {
if blocks.is_empty() {
return Ok(());
}
if let Some(metering_points) = self.metering_points {
self.set_metering_points(metering_points).await?
}
let bytes = serialize(&blocks);
let uid = self.manifest.uid();
let fun = {
let store_guard = self.store.lock().await;
self.instance.exports.get_typed_function::<(u32, u32), ()>(
&store_guard,
ffi::MODULE_ENTRYPOINT,
)?
};
let _ = self.db.lock().await.start_transaction().await?;
#[cfg(feature = "metrics")]
let start = Instant::now();
let res = spawn_blocking({
let store = self.store.clone();
let instance = self.instance.clone();
let metering_enabled = self.metering_enabled();
move || {
let store_guard =
tokio::runtime::Handle::current().block_on(store.lock());
let mut arg =
ffi::WasmArg::new(store_guard, instance, bytes, metering_enabled)
.unwrap();
let ptr = arg.get_ptr();
let len = arg.get_len();
fun.call(&mut arg.store(), ptr, len)
}
})
.await?;
#[cfg(feature = "metrics")]
{
METRICS
.exec
.handler
.record(&self.manifest.uid(), start.elapsed().as_millis() as f64);
}
if let Err(e) = res {
if self.metering_points_exhausted().await {
self.db.lock().await.revert_transaction().await?;
return Err(IndexerError::RunTimeLimitExceededError);
} else {
if let Some(e) = e.downcast_ref::<WasmIndexerError>() {
match e {
WasmIndexerError::KillSwitch => {
info!("Indexer({uid}) WASM execution terminated: {e}.")
}
_ => {
error!("Indexer({uid}) WASM execution failed: {e}.")
}
}
} else {
error!("Indexer({uid}) WASM execution failed: {e:?}.");
};
self.db.lock().await.revert_transaction().await?;
return Err(IndexerError::from(e));
}
} else {
if self.kill_switch.load(Ordering::SeqCst) {
self.db.lock().await.revert_transaction().await?;
} else {
self.db.lock().await.commit_transaction().await?;
}
}
Ok(())
}
fn kill_switch(&self) -> &Arc<AtomicBool> {
&self.kill_switch
}
fn manifest(&self) -> &Manifest {
&self.manifest
}
async fn get_error_message(&self) -> IndexerResult<String> {
let mut store = self.store.lock().await;
let result = ffi::get_error_message(&mut store, &self.instance)?;
Ok(result)
}
}