use crate::{
database::Database,
service::adapters::P2PAdapter,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
ServiceRunner,
State,
StateWatcher,
};
use std::net::SocketAddr;
use tracing::log::warn;
pub use config::{
Config,
DbType,
VMConfig,
};
pub use fuel_core_services::Service as ServiceTrait;
pub use fuel_core_consensus_module::RelayerVerifierConfig;
use self::adapters::BlockImporterAdapter;
pub mod adapters;
pub mod config;
pub mod genesis;
pub mod metrics;
pub mod sub_services;
#[derive(Clone)]
pub struct SharedState {
pub txpool: fuel_core_txpool::service::SharedState<P2PAdapter, Database>,
#[cfg(feature = "p2p")]
pub network: Option<fuel_core_p2p::service::SharedState>,
#[cfg(feature = "relayer")]
pub relayer: Option<fuel_core_relayer::SharedState<Database>>,
pub graph_ql: crate::fuel_core_graphql_api::service::SharedState,
pub block_importer: BlockImporterAdapter,
#[cfg(feature = "test-helpers")]
pub config: Config,
}
pub struct FuelService {
runner: ServiceRunner<Task>,
pub shared: SharedState,
pub bound_address: SocketAddr,
}
impl FuelService {
#[tracing::instrument(skip_all, fields(name = %config.name))]
pub fn new(database: Database, mut config: Config) -> anyhow::Result<Self> {
database.init(&config.chain_conf)?;
Self::make_config_consistent(&mut config);
let task = Task::new(database, config)?;
let runner = ServiceRunner::new(task);
let shared = runner.shared.clone();
let bound_address = runner.shared.graph_ql.bound_address;
Ok(FuelService {
bound_address,
shared,
runner,
})
}
pub async fn new_node(config: Config) -> anyhow::Result<Self> {
let database = match config.database_type {
#[cfg(feature = "rocksdb")]
DbType::RocksDb => {
if config.database_path.as_os_str().is_empty() {
warn!(
"No RocksDB path configured, initializing database with a tmp directory"
);
Database::default()
} else {
Database::open(&config.database_path, config.max_database_cache_size)?
}
}
DbType::InMemory => Database::in_memory(),
#[cfg(not(feature = "rocksdb"))]
_ => Database::in_memory(),
};
Self::from_database(database, config).await
}
pub async fn from_database(
database: Database,
config: Config,
) -> anyhow::Result<Self> {
let service = Self::new(database, config)?;
service.runner.start_and_await().await?;
Ok(service)
}
#[cfg(feature = "relayer")]
pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
if let Some(relayer_handle) = &self.runner.shared.relayer {
relayer_handle.await_synced().await?;
}
Ok(())
}
fn make_config_consistent(config: &mut Config) {
if config.txpool.chain_config != config.chain_conf {
warn!("The `ChainConfig` of `TxPool` was inconsistent");
config.txpool.chain_config = config.chain_conf.clone();
}
if config.txpool.utxo_validation != config.utxo_validation {
warn!("The `utxo_validation` of `TxPool` was inconsistent");
config.txpool.utxo_validation = config.utxo_validation;
}
if config.block_producer.utxo_validation != config.utxo_validation {
warn!("The `utxo_validation` of `BlockProducer` was inconsistent");
config.block_producer.utxo_validation = config.utxo_validation;
}
}
}
#[async_trait::async_trait]
impl ServiceTrait for FuelService {
fn start(&self) -> anyhow::Result<()> {
self.runner.start()
}
async fn start_and_await(&self) -> anyhow::Result<State> {
self.runner.start_and_await().await
}
async fn await_start_or_stop(&self) -> anyhow::Result<State> {
self.runner.await_start_or_stop().await
}
fn stop(&self) -> bool {
self.runner.stop()
}
async fn stop_and_await(&self) -> anyhow::Result<State> {
self.runner.stop_and_await().await
}
async fn await_stop(&self) -> anyhow::Result<State> {
self.runner.await_stop().await
}
fn state(&self) -> State {
self.runner.state()
}
fn state_watcher(&self) -> StateWatcher {
self.runner.state_watcher()
}
}
pub type SubServices = Vec<Box<dyn ServiceTrait + Send + Sync + 'static>>;
pub struct Task {
services: SubServices,
pub shared: SharedState,
}
impl Task {
pub fn new(database: Database, config: Config) -> anyhow::Result<Task> {
genesis::maybe_initialize_state(&config, &database)?;
let (services, shared) = sub_services::init_sub_services(&config, &database)?;
Ok(Task { services, shared })
}
#[cfg(test)]
pub fn sub_services(&mut self) -> &mut SubServices {
&mut self.services
}
}
#[async_trait::async_trait]
impl RunnableService for Task {
const NAME: &'static str = "FuelService";
type SharedData = SharedState;
type Task = Task;
fn shared_data(&self) -> Self::SharedData {
self.shared.clone()
}
async fn into_task(self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
for service in &self.services {
service.start_and_await().await?;
}
Ok(self)
}
}
#[async_trait::async_trait]
impl RunnableTask for Task {
#[tracing::instrument(skip_all)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let mut stop_signals = vec![];
for service in &self.services {
stop_signals.push(service.await_stop())
}
stop_signals.push(Box::pin(watcher.while_started()));
let (result, _, _) = futures::future::select_all(stop_signals).await;
if let Err(err) = result {
tracing::error!("Got an error during listen for shutdown: {}", err);
}
let should_continue = false;
Ok(should_continue)
}
async fn shutdown(self) -> anyhow::Result<()> {
for service in self.services {
let result = service.stop_and_await().await;
if let Err(err) = result {
tracing::error!(
"Got and error during awaiting for stop of the service: {}",
err
);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::service::{
Config,
Task,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
State,
};
use std::{
thread::sleep,
time::Duration,
};
#[tokio::test]
async fn run_start_and_stop() {
let mut i = 0;
loop {
let task = Task::new(Default::default(), Config::local_node()).unwrap();
let (_, receiver) = tokio::sync::watch::channel(State::NotStarted);
let mut watcher = receiver.into();
let mut task = task.into_task(&watcher).await.unwrap();
sleep(Duration::from_secs(1));
for service in task.sub_services() {
assert_eq!(service.state(), State::Started);
}
if i < task.sub_services().len() {
task.sub_services()[i].stop_and_await().await.unwrap();
assert!(!task.run(&mut watcher).await.unwrap());
} else {
break
}
i += 1;
}
#[allow(unused_mut)]
let mut expected_services = 3;
#[cfg(feature = "p2p")]
{
expected_services += 1;
}
assert_eq!(i, expected_services);
}
#[tokio::test]
async fn shutdown_stops_all_services() {
let task = Task::new(Default::default(), Config::local_node()).unwrap();
let mut task = task.into_task(&Default::default()).await.unwrap();
let sub_services_watchers: Vec<_> = task
.sub_services()
.iter()
.map(|s| s.state_watcher())
.collect();
sleep(Duration::from_secs(1));
for service in task.sub_services() {
assert_eq!(service.state(), State::Started);
}
task.shutdown().await.unwrap();
for mut service in sub_services_watchers {
assert_eq!(service.borrow_and_update().clone(), State::Stopped);
}
}
}