use self::adapters::BlockImporterAdapter;
use crate::{
combined_database::CombinedDatabase,
database::Database,
service::adapters::PoAAdapter,
};
use fuel_core_poa::ports::BlockImporter;
use fuel_core_services::{
RunnableService,
RunnableTask,
ServiceRunner,
State,
StateWatcher,
};
use fuel_core_storage::IsNotFound;
use std::net::SocketAddr;
use crate::service::sub_services::TxPoolSharedState;
pub use config::{
Config,
DbType,
RelayerConsensusConfig,
VMConfig,
};
pub use fuel_core_services::Service as ServiceTrait;
pub mod adapters;
pub mod config;
pub mod genesis;
pub mod metrics;
mod query;
pub mod sub_services;
pub mod vm_pool;
#[derive(Clone)]
pub struct SharedState {
pub poa_adapter: PoAAdapter,
pub txpool_shared_state: TxPoolSharedState,
#[cfg(feature = "p2p")]
pub network: Option<fuel_core_p2p::service::SharedState>,
#[cfg(feature = "relayer")]
pub relayer: Option<
fuel_core_relayer::SharedState<
Database<crate::database::database_description::relayer::Relayer>,
>,
>,
pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState,
pub database: CombinedDatabase,
pub block_importer: BlockImporterAdapter,
pub config: Config,
}
pub struct FuelService {
runner: ServiceRunner<Task>,
pub shared: SharedState,
pub bound_address: SocketAddr,
}
impl Drop for FuelService {
fn drop(&mut self) {
self.stop();
}
}
impl FuelService {
#[tracing::instrument(skip_all, fields(name = %config.name))]
pub fn new(database: CombinedDatabase, config: Config) -> anyhow::Result<Self> {
let config = config.make_config_consistent();
let task = Task::new(database, config)?;
let runner = ServiceRunner::new(task);
let shared = runner.shared.clone();
let bound_address = runner.shared.graph_ql.bound_address;
Ok(FuelService {
bound_address,
shared,
runner,
})
}
pub async fn new_node(config: Config) -> anyhow::Result<Self> {
let combined_database =
CombinedDatabase::from_config(&config.combined_db_config)?;
Self::from_combined_database(combined_database, config).await
}
pub async fn from_database(
database: Database,
config: Config,
) -> anyhow::Result<Self> {
let combined_database =
CombinedDatabase::new(database, Default::default(), Default::default());
Self::from_combined_database(combined_database, config).await
}
pub async fn from_combined_database(
combined_database: CombinedDatabase,
config: Config,
) -> anyhow::Result<Self> {
let service = Self::new(combined_database, config)?;
let state = service.runner.start_and_await().await?;
if !state.started() {
return Err(anyhow::anyhow!(
"The state of the service is not started: {state:?}"
));
}
Ok(service)
}
#[cfg(feature = "relayer")]
pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
if let Some(relayer_handle) = &self.runner.shared.relayer {
relayer_handle.await_synced().await?;
}
Ok(())
}
}
#[async_trait::async_trait]
impl ServiceTrait for FuelService {
fn start(&self) -> anyhow::Result<()> {
self.runner.start()
}
async fn start_and_await(&self) -> anyhow::Result<State> {
self.runner.start_and_await().await
}
async fn await_start_or_stop(&self) -> anyhow::Result<State> {
self.runner.await_start_or_stop().await
}
fn stop(&self) -> bool {
self.runner.stop()
}
async fn stop_and_await(&self) -> anyhow::Result<State> {
self.runner.stop_and_await().await
}
async fn await_stop(&self) -> anyhow::Result<State> {
self.runner.await_stop().await
}
fn state(&self) -> State {
self.runner.state()
}
fn state_watcher(&self) -> StateWatcher {
self.runner.state_watcher()
}
}
pub type SubServices = Vec<Box<dyn ServiceTrait + Send + Sync + 'static>>;
pub struct Task {
services: SubServices,
pub shared: SharedState,
}
impl Task {
pub fn new(database: CombinedDatabase, config: Config) -> anyhow::Result<Task> {
tracing::info!("Initializing database");
database.check_version()?;
tracing::info!("Initializing sub services");
let (services, shared) = sub_services::init_sub_services(&config, database)?;
Ok(Task { services, shared })
}
#[cfg(test)]
pub fn sub_services(&mut self) -> &mut SubServices {
&mut self.services
}
}
#[async_trait::async_trait]
impl RunnableService for Task {
const NAME: &'static str = "FuelService";
type SharedData = SharedState;
type Task = Task;
type TaskParams = ();
fn shared_data(&self) -> Self::SharedData {
self.shared.clone()
}
async fn into_task(
self,
watcher: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
if let Err(err) = self.shared.database.on_chain().get_genesis() {
if err.is_not_found() {
let result = genesis::execute_genesis_block(
watcher.clone(),
&self.shared.config,
&self.shared.database,
)
.await?;
self.shared.block_importer.commit_result(result).await?;
}
}
for service in &self.services {
service.start_and_await().await?;
}
Ok(self)
}
}
#[async_trait::async_trait]
impl RunnableTask for Task {
#[tracing::instrument(skip_all)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let mut stop_signals = vec![];
for service in &self.services {
stop_signals.push(service.await_stop())
}
stop_signals.push(Box::pin(watcher.while_started()));
let (result, _, _) = futures::future::select_all(stop_signals).await;
if let Err(err) = result {
tracing::error!("Got an error during listen for shutdown: {}", err);
}
let should_continue = false;
Ok(should_continue)
}
async fn shutdown(self) -> anyhow::Result<()> {
for service in self.services {
let result = service.stop_and_await().await;
if let Err(err) = result {
tracing::error!(
"Got and error during awaiting for stop of the service: {}",
err
);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::service::{
Config,
Task,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
State,
};
use std::{
thread::sleep,
time::Duration,
};
#[tokio::test]
async fn run_start_and_stop() {
let mut i = 0;
loop {
let task = Task::new(Default::default(), Config::local_node()).unwrap();
let (_, receiver) = tokio::sync::watch::channel(State::NotStarted);
let mut watcher = receiver.into();
let mut task = task.into_task(&watcher, ()).await.unwrap();
sleep(Duration::from_secs(1));
for service in task.sub_services() {
assert_eq!(service.state(), State::Started);
}
if i < task.sub_services().len() {
task.sub_services()[i].stop_and_await().await.unwrap();
assert!(!task.run(&mut watcher).await.unwrap());
} else {
break;
}
i += 1;
}
#[allow(unused_mut)]
let mut expected_services = 5;
#[cfg(feature = "p2p")]
{
expected_services += 2;
}
assert_eq!(i, expected_services);
}
#[tokio::test]
async fn shutdown_stops_all_services() {
let task = Task::new(Default::default(), Config::local_node()).unwrap();
let mut task = task.into_task(&Default::default(), ()).await.unwrap();
let sub_services_watchers: Vec<_> = task
.sub_services()
.iter()
.map(|s| s.state_watcher())
.collect();
sleep(Duration::from_secs(1));
for service in task.sub_services() {
assert_eq!(service.state(), State::Started);
}
task.shutdown().await.unwrap();
for mut service in sub_services_watchers {
assert_eq!(service.borrow_and_update().clone(), State::Stopped);
}
}
}